diff --git a/README-zh.md b/README-zh.md
index 24916e0e..6822bdae 100644
--- a/README-zh.md
+++ b/README-zh.md
@@ -4,6 +4,7 @@
## 🎉 新闻
+- [X] [2025.06.05]🎯📢LightRAG现已集成MinerU,支持多模态文档解析与RAG(PDF、图片、Office、表格、公式等)。详见下方多模态处理模块。
- [X] [2025.03.18]🎯📢LightRAG现已支持引文功能。
- [X] [2025.02.05]🎯📢我们团队发布了[VideoRAG](https://github.com/HKUDS/VideoRAG),用于理解超长上下文视频。
- [X] [2025.01.13]🎯📢我们团队发布了[MiniRAG](https://github.com/HKUDS/MiniRAG),使用小型模型简化RAG。
@@ -1002,6 +1003,32 @@ rag.merge_entities(
+## 多模态文档处理(MinerU集成)
+
+LightRAG 现已支持通过 [MinerU](https://github.com/opendatalab/MinerU) 实现多模态文档解析与检索增强生成(RAG)。您可以从 PDF、图片、Office 文档中提取结构化内容(文本、图片、表格、公式等),并在 RAG 流程中使用。
+
+**主要特性:**
+- 支持解析 PDF、图片、DOC/DOCX/PPT/PPTX 等多种格式
+- 提取并索引文本、图片、表格、公式及文档结构
+- 在 RAG 中查询和检索多模态内容(文本、图片、表格、公式)
+- 与 LightRAG Core 及 RAGAnything 无缝集成
+
+**快速开始:**
+1. 安装依赖:
+ ```bash
+ pip install "magic-pdf[full]>=1.2.2" huggingface_hub
+ ```
+2. 下载 MinerU 模型权重(详见 [MinerU 集成指南](docs/mineru_integration_zh.md))
+3. 使用新版 `MineruParser` 或 RAGAnything 的 `process_document_complete` 处理文件:
+ ```python
+ from lightrag.mineru_parser import MineruParser
+ content_list, md_content = MineruParser.parse_pdf('path/to/document.pdf', 'output_dir')
+ # 或自动识别类型:
+ content_list, md_content = MineruParser.parse_document('path/to/file', 'auto', 'output_dir')
+ ```
+4. 使用 LightRAG 查询多模态内容请参见 [docs/mineru_integration_zh.md](docs/mineru_integration_zh.md)。
+
+
## Token统计功能
diff --git a/README.md b/README.md
index b9d6a31c..795901bb 100644
--- a/README.md
+++ b/README.md
@@ -39,7 +39,7 @@
## 🎉 News
-
+- [X] [2025.06.05]🎯📢LightRAG now supports multimodal document parsing and RAG with MinerU integration (PDF, images, Office, tables, formulas, etc.). See the new multimodal section below.
- [X] [2025.03.18]🎯📢LightRAG now supports citation functionality, enabling proper source attribution.
- [X] [2025.02.05]🎯📢Our team has released [VideoRAG](https://github.com/HKUDS/VideoRAG) understanding extremely long-context videos.
- [X] [2025.01.13]🎯📢Our team has released [MiniRAG](https://github.com/HKUDS/MiniRAG) making RAG simpler with small models.
@@ -1051,6 +1051,31 @@ When merging entities:
+## Multimodal Document Processing (MinerU Integration)
+
+LightRAG now supports multimodal document parsing and retrieval-augmented generation (RAG) via [MinerU](https://github.com/opendatalab/MinerU). You can extract structured content (text, images, tables, formulas, etc.) from PDF, images, and Office documents, and use them in your RAG pipeline.
+
+**Key Features:**
+- Parse PDFs, images, DOC/DOCX/PPT/PPTX, and more
+- Extract and index text, images, tables, formulas, and document structure
+- Query and retrieve multimodal content (text, image, table, formula) in RAG
+- Seamless integration with LightRAG core and RAGAnything
+
+**Quick Start:**
+1. Install dependencies:
+ ```bash
+ pip install "magic-pdf[full]>=1.2.2" huggingface_hub
+ ```
+2. Download MinerU model weights (see [MinerU Integration Guide](docs/mineru_integration_en.md))
+3. Use the new `MineruParser` or RAGAnything's `process_document_complete` to process files:
+ ```python
+ from lightrag.mineru_parser import MineruParser
+ content_list, md_content = MineruParser.parse_pdf('path/to/document.pdf', 'output_dir')
+ # or for any file type:
+ content_list, md_content = MineruParser.parse_document('path/to/file', 'auto', 'output_dir')
+ ```
+4. Query multimodal content with LightRAG see [docs/mineru_integration_en.md](docs/mineru_integration_en.md).
+
## Token Usage Tracking
diff --git a/docs/mineru_integration_en.md b/docs/mineru_integration_en.md
new file mode 100644
index 00000000..bd74e4df
--- /dev/null
+++ b/docs/mineru_integration_en.md
@@ -0,0 +1,246 @@
+# MinerU Integration Guide
+
+### About MinerU
+
+MinerU is a powerful open-source tool for extracting high-quality structured data from PDF, image, and office documents. It provides the following features:
+
+- Text extraction while preserving document structure (headings, paragraphs, lists, etc.)
+- Handling complex layouts including multi-column formats
+- Automatic formula recognition and conversion to LaTeX format
+- Image, table, and footnote extraction
+- Automatic scanned document detection and OCR application
+- Support for multiple output formats (Markdown, JSON)
+
+### Installation
+
+#### Installing MinerU Dependencies
+
+If you have already installed LightRAG but don't have MinerU support, you can add MinerU support by installing the magic-pdf package directly:
+
+```bash
+pip install "magic-pdf[full]>=1.2.2" huggingface_hub
+```
+
+These are the MinerU-related dependencies required by LightRAG.
+
+#### MinerU Model Weights
+
+MinerU requires model weight files to function properly. After installation, you need to download the required model weights. You can use either Hugging Face or ModelScope to download the models.
+
+##### Option 1: Download from Hugging Face
+
+```bash
+pip install huggingface_hub
+wget https://github.com/opendatalab/MinerU/raw/master/scripts/download_models_hf.py -O download_models_hf.py
+python download_models_hf.py
+```
+
+##### Option 2: Download from ModelScope (Recommended for users in China)
+
+```bash
+pip install modelscope
+wget https://github.com/opendatalab/MinerU/raw/master/scripts/download_models.py -O download_models.py
+python download_models.py
+```
+
+Both methods will automatically download the model files and configure the model directory in the configuration file. The configuration file is located in your user directory and named `magic-pdf.json`.
+
+> **Note for Windows users**: User directory is at `C:\Users\username`
+> **Note for Linux users**: User directory is at `/home/username`
+> **Note for macOS users**: User directory is at `/Users/username`
+
+#### Optional: LibreOffice Installation
+
+To process Office documents (DOC, DOCX, PPT, PPTX), you need to install LibreOffice:
+
+**Linux/macOS:**
+```bash
+apt-get/yum/brew install libreoffice
+```
+
+**Windows:**
+1. Install LibreOffice
+2. Add the installation directory to your PATH: `install_dir\LibreOffice\program`
+
+### Using MinerU Parser
+
+#### Basic Usage
+
+```python
+from lightrag.mineru_parser import MineruParser
+
+# Parse a PDF document
+content_list, md_content = MineruParser.parse_pdf('path/to/document.pdf', 'output_dir')
+
+# Parse an image
+content_list, md_content = MineruParser.parse_image('path/to/image.jpg', 'output_dir')
+
+# Parse an Office document
+content_list, md_content = MineruParser.parse_office_doc('path/to/document.docx', 'output_dir')
+
+# Auto-detect and parse any supported document type
+content_list, md_content = MineruParser.parse_document('path/to/file', 'auto', 'output_dir')
+```
+
+#### RAGAnything Integration
+
+In RAGAnything, you can directly use file paths as input to the `process_document_complete` method to process documents. Here's a complete configuration example:
+
+```python
+from lightrag.llm.openai import openai_complete_if_cache, openai_embed
+from lightrag.raganything import RAGAnything
+
+
+# Initialize RAGAnything
+rag = RAGAnything(
+ working_dir="./rag_storage", # Working directory
+ llm_model_func=lambda prompt, system_prompt=None, history_messages=[], **kwargs: openai_complete_if_cache(
+ "gpt-4o-mini", # Model to use
+ prompt,
+ system_prompt=system_prompt,
+ history_messages=history_messages,
+ api_key="your-api-key", # Replace with your API key
+ base_url="your-base-url", # Replace with your API base URL
+ **kwargs,
+ ),
+ vision_model_func=lambda prompt, system_prompt=None, history_messages=[], image_data=None, **kwargs: openai_complete_if_cache(
+ "gpt-4o", # Vision model
+ "",
+ system_prompt=None,
+ history_messages=[],
+ messages=[
+ {"role": "system", "content": system_prompt} if system_prompt else None,
+ {"role": "user", "content": [
+ {"type": "text", "text": prompt},
+ {
+ "type": "image_url",
+ "image_url": {
+ "url": f"data:image/jpeg;base64,{image_data}"
+ }
+ }
+ ]} if image_data else {"role": "user", "content": prompt}
+ ],
+ api_key="your-api-key", # Replace with your API key
+ base_url="your-base-url", # Replace with your API base URL
+ **kwargs,
+ ) if image_data else openai_complete_if_cache(
+ "gpt-4o-mini",
+ prompt,
+ system_prompt=system_prompt,
+ history_messages=history_messages,
+ api_key="your-api-key", # Replace with your API key
+ base_url="your-base-url", # Replace with your API base URL
+ **kwargs,
+ ),
+ embedding_func=lambda texts: openai_embed(
+ texts,
+ model="text-embedding-3-large",
+ api_key="your-api-key", # Replace with your API key
+ base_url="your-base-url", # Replace with your API base URL
+ ),
+ embedding_dim=3072,
+ max_token_size=8192
+)
+
+# Process a single file
+await rag.process_document_complete(
+ file_path="path/to/document.pdf",
+ output_dir="./output",
+ parse_method="auto"
+)
+
+# Query the processed document
+result = await rag.query_with_multimodal(
+ "What is the main content of the document?",
+ mode="hybrid"
+)
+
+```
+
+MinerU categorizes document content into text, formulas, images, and tables, processing each with its corresponding ingestion type:
+- Text content: `ingestion_type='text'`
+- Image content: `ingestion_type='image'`
+- Table content: `ingestion_type='table'`
+- Formula content: `ingestion_type='equation'`
+
+#### Query Examples
+
+Here are some common query examples:
+
+```python
+# Query text content
+result = await rag.query_with_multimodal(
+ "What is the main topic of the document?",
+ mode="hybrid"
+)
+
+# Query image-related content
+result = await rag.query_with_multimodal(
+ "Describe the images and figures in the document",
+ mode="hybrid"
+)
+
+# Query table-related content
+result = await rag.query_with_multimodal(
+ "Tell me about the experimental results and data tables",
+ mode="hybrid"
+)
+```
+
+#### Command Line Tool
+
+We also provide a command-line tool for document parsing:
+
+```bash
+python examples/mineru_example.py path/to/document.pdf
+```
+
+Optional parameters:
+- `--output` or `-o`: Specify output directory
+- `--method` or `-m`: Choose parsing method (auto, ocr, txt)
+- `--stats`: Display content statistics
+
+### Output Format
+
+MinerU generates three files for each parsed document:
+
+1. `{filename}.md` - Markdown representation of the document
+2. `{filename}_content_list.json` - Structured JSON content
+3. `{filename}_model.json` - Detailed model parsing results
+
+The `content_list.json` file contains all structured content extracted from the document, including:
+- Text blocks (body text, headings, etc.)
+- Images (paths and optional captions)
+- Tables (table content and optional captions)
+- Lists
+- Formulas
+
+### Troubleshooting
+
+If you encounter issues with MinerU:
+
+1. Check that model weights are correctly downloaded
+2. Ensure you have sufficient RAM (16GB+ recommended)
+3. For CUDA acceleration issues, see [MinerU documentation](https://mineru.readthedocs.io/en/latest/additional_notes/faq.html)
+4. If parsing Office documents fails, verify LibreOffice is properly installed
+5. If you encounter `pickle.UnpicklingError: invalid load key, 'v'.`, it might be due to an incomplete model download. Try re-downloading the models.
+6. For users with newer graphics cards (H100, etc.) and garbled OCR text, try upgrading the CUDA version used by Paddle:
+ ```bash
+ pip install paddlepaddle-gpu==3.0.0b1 -i https://www.paddlepaddle.org.cn/packages/stable/cu123/
+ ```
+7. If you encounter a "filename too long" error, the latest version of MineruParser includes logic to automatically handle this issue.
+
+#### Updating Existing Models
+
+If you have previously downloaded models and need to update them, you can simply run the download script again. The script will update the model directory to the latest version.
+
+### Advanced Configuration
+
+The MinerU configuration file `magic-pdf.json` supports various customization options, including:
+
+- Model directory path
+- OCR engine selection
+- GPU acceleration settings
+- Cache settings
+
+For complete configuration options, refer to the [MinerU official documentation](https://mineru.readthedocs.io/).
\ No newline at end of file
diff --git a/docs/mineru_integration_zh.md b/docs/mineru_integration_zh.md
new file mode 100644
index 00000000..901fa882
--- /dev/null
+++ b/docs/mineru_integration_zh.md
@@ -0,0 +1,245 @@
+# MinerU 集成指南
+
+### 关于 MinerU
+
+MinerU 是一个强大的开源工具,用于从 PDF、图像和 Office 文档中提取高质量的结构化数据。它提供以下功能:
+
+- 保留文档结构(标题、段落、列表等)的文本提取
+- 处理包括多列格式在内的复杂布局
+- 自动识别并将公式转换为 LaTeX 格式
+- 提取图像、表格和脚注
+- 自动检测扫描文档并应用 OCR
+- 支持多种输出格式(Markdown、JSON)
+
+### 安装
+
+#### 安装 MinerU 依赖
+
+如果您已经安装了 LightRAG,但没有 MinerU 支持,您可以通过安装 magic-pdf 包来直接添加 MinerU 支持:
+
+```bash
+pip install "magic-pdf[full]>=1.2.2" huggingface_hub
+```
+
+这些是 LightRAG 所需的 MinerU 相关依赖项。
+
+#### MinerU 模型权重
+
+MinerU 需要模型权重文件才能正常运行。安装后,您需要下载所需的模型权重。您可以使用 Hugging Face 或 ModelScope 下载模型。
+
+##### 选项 1:从 Hugging Face 下载
+
+```bash
+pip install huggingface_hub
+wget https://github.com/opendatalab/MinerU/raw/master/scripts/download_models_hf.py -O download_models_hf.py
+python download_models_hf.py
+```
+
+##### 选项 2:从 ModelScope 下载(推荐中国用户使用)
+
+```bash
+pip install modelscope
+wget https://github.com/opendatalab/MinerU/raw/master/scripts/download_models.py -O download_models.py
+python download_models.py
+```
+
+两种方法都会自动下载模型文件并在配置文件中配置模型目录。配置文件位于用户目录中,名为 `magic-pdf.json`。
+
+> **Windows 用户注意**:用户目录位于 `C:\Users\用户名`
+> **Linux 用户注意**:用户目录位于 `/home/用户名`
+> **macOS 用户注意**:用户目录位于 `/Users/用户名`
+
+#### 可选:安装 LibreOffice
+
+要处理 Office 文档(DOC、DOCX、PPT、PPTX),您需要安装 LibreOffice:
+
+**Linux/macOS:**
+```bash
+apt-get/yum/brew install libreoffice
+```
+
+**Windows:**
+1. 安装 LibreOffice
+2. 将安装目录添加到 PATH 环境变量:`安装目录\LibreOffice\program`
+
+### 使用 MinerU 解析器
+
+#### 基本用法
+
+```python
+from lightrag.mineru_parser import MineruParser
+
+# 解析 PDF 文档
+content_list, md_content = MineruParser.parse_pdf('path/to/document.pdf', 'output_dir')
+
+# 解析图像
+content_list, md_content = MineruParser.parse_image('path/to/image.jpg', 'output_dir')
+
+# 解析 Office 文档
+content_list, md_content = MineruParser.parse_office_doc('path/to/document.docx', 'output_dir')
+
+# 自动检测并解析任何支持的文档类型
+content_list, md_content = MineruParser.parse_document('path/to/file', 'auto', 'output_dir')
+```
+
+#### RAGAnything 集成
+
+在 RAGAnything 中,您可以直接使用文件路径作为 `process_document_complete` 方法的输入来处理文档。以下是一个完整的配置示例:
+
+```python
+from lightrag.llm.openai import openai_complete_if_cache, openai_embed
+from lightrag.raganything import RAGAnything
+
+
+# 初始化 RAGAnything
+rag = RAGAnything(
+ working_dir="./rag_storage", # 工作目录
+ llm_model_func=lambda prompt, system_prompt=None, history_messages=[], **kwargs: openai_complete_if_cache(
+ "gpt-4o-mini", # 使用的模型
+ prompt,
+ system_prompt=system_prompt,
+ history_messages=history_messages,
+ api_key="your-api-key", # 替换为您的 API 密钥
+ base_url="your-base-url", # 替换为您的 API 基础 URL
+ **kwargs,
+ ),
+ vision_model_func=lambda prompt, system_prompt=None, history_messages=[], image_data=None, **kwargs: openai_complete_if_cache(
+ "gpt-4o", # 视觉模型
+ "",
+ system_prompt=None,
+ history_messages=[],
+ messages=[
+ {"role": "system", "content": system_prompt} if system_prompt else None,
+ {"role": "user", "content": [
+ {"type": "text", "text": prompt},
+ {
+ "type": "image_url",
+ "image_url": {
+ "url": f"data:image/jpeg;base64,{image_data}"
+ }
+ }
+ ]} if image_data else {"role": "user", "content": prompt}
+ ],
+ api_key="your-api-key", # 替换为您的 API 密钥
+ base_url="your-base-url", # 替换为您的 API 基础 URL
+ **kwargs,
+ ) if image_data else openai_complete_if_cache(
+ "gpt-4o-mini",
+ prompt,
+ system_prompt=system_prompt,
+ history_messages=history_messages,
+ api_key="your-api-key", # 替换为您的 API 密钥
+ base_url="your-base-url", # 替换为您的 API 基础 URL
+ **kwargs,
+ ),
+ embedding_func=lambda texts: openai_embed(
+ texts,
+ model="text-embedding-3-large",
+ api_key="your-api-key", # 替换为您的 API 密钥
+ base_url="your-base-url", # 替换为您的 API 基础 URL
+ ),
+ embedding_dim=3072,
+ max_token_size=8192
+)
+
+# 处理单个文件
+await rag.process_document_complete(
+ file_path="path/to/document.pdf",
+ output_dir="./output",
+ parse_method="auto"
+)
+
+# 查询处理后的文档
+result = await rag.query_with_multimodal(
+ "What is the main content of the document?",
+ mode="hybrid"
+)
+```
+
+MinerU 会将文档内容分类为文本、公式、图像和表格,分别使用相应的摄入类型进行处理:
+- 文本内容:`ingestion_type='text'`
+- 图像内容:`ingestion_type='image'`
+- 表格内容:`ingestion_type='table'`
+- 公式内容:`ingestion_type='equation'`
+
+#### 查询示例
+
+以下是一些常见的查询示例:
+
+```python
+# 查询文本内容
+result = await rag.query_with_multimodal(
+ "What is the main topic of the document?",
+ mode="hybrid"
+)
+
+# 查询图片相关内容
+result = await rag.query_with_multimodal(
+ "Describe the images and figures in the document",
+ mode="hybrid"
+)
+
+# 查询表格相关内容
+result = await rag.query_with_multimodal(
+ "Tell me about the experimental results and data tables",
+ mode="hybrid"
+)
+```
+
+#### 命令行工具
+
+我们还提供了一个用于文档解析的命令行工具:
+
+```bash
+python examples/mineru_example.py path/to/document.pdf
+```
+
+可选参数:
+- `--output` 或 `-o`:指定输出目录
+- `--method` 或 `-m`:选择解析方法(auto、ocr、txt)
+- `--stats`:显示内容统计信息
+
+### 输出格式
+
+MinerU 为每个解析的文档生成三个文件:
+
+1. `{文件名}.md` - 文档的 Markdown 表示
+2. `{文件名}_content_list.json` - 结构化 JSON 内容
+3. `{文件名}_model.json` - 详细的模型解析结果
+
+`content_list.json` 文件包含从文档中提取的所有结构化内容,包括:
+- 文本块(正文、标题等)
+- 图像(路径和可选的标题)
+- 表格(表格内容和可选的标题)
+- 列表
+- 公式
+
+### 疑难解答
+
+如果您在使用 MinerU 时遇到问题:
+
+1. 检查模型权重是否正确下载
+2. 确保有足够的内存(建议 16GB+)
+3. 对于 CUDA 加速问题,请参阅 [MinerU 文档](https://mineru.readthedocs.io/en/latest/additional_notes/faq.html)
+4. 如果解析 Office 文档失败,请验证 LibreOffice 是否正确安装
+5. 如果遇到 `pickle.UnpicklingError: invalid load key, 'v'.`,可能是因为模型下载不完整。尝试重新下载模型。
+6. 对于使用较新显卡(H100 等)并出现 OCR 文本乱码的用户,请尝试升级 Paddle 使用的 CUDA 版本:
+ ```bash
+ pip install paddlepaddle-gpu==3.0.0b1 -i https://www.paddlepaddle.org.cn/packages/stable/cu123/
+ ```
+7. 如果遇到 "文件名太长" 错误,最新版本的 MineruParser 已经包含了自动处理此问题的逻辑。
+
+#### 更新现有模型
+
+如果您之前已经下载了模型并需要更新它们,只需再次运行下载脚本即可。脚本将更新模型目录到最新版本。
+
+### 高级配置
+
+MinerU 配置文件 `magic-pdf.json` 支持多种自定义选项,包括:
+
+- 模型目录路径
+- OCR 引擎选择
+- GPU 加速设置
+- 缓存设置
+
+有关完整的配置选项,请参阅 [MinerU 官方文档](https://mineru.readthedocs.io/)。
\ No newline at end of file
diff --git a/examples/mineru_example.py b/examples/mineru_example.py
new file mode 100644
index 00000000..7d6dc87c
--- /dev/null
+++ b/examples/mineru_example.py
@@ -0,0 +1,82 @@
+#!/usr/bin/env python
+"""
+Example script demonstrating the basic usage of MinerU parser
+
+This example shows how to:
+1. Parse different types of documents (PDF, images, office documents)
+2. Use different parsing methods
+3. Display document statistics
+"""
+
+import os
+import argparse
+from pathlib import Path
+from lightrag.mineru_parser import MineruParser
+
+def parse_document(file_path: str, output_dir: str = None, method: str = "auto", stats: bool = False):
+ """
+ Parse a document using MinerU parser
+
+ Args:
+ file_path: Path to the document
+ output_dir: Output directory for parsed results
+ method: Parsing method (auto, ocr, txt)
+ stats: Whether to display content statistics
+ """
+ try:
+ # Parse the document
+ content_list, md_content = MineruParser.parse_document(
+ file_path=file_path,
+ parse_method=method,
+ output_dir=output_dir
+ )
+
+ # Display statistics if requested
+ if stats:
+ print("\nDocument Statistics:")
+ print(f"Total content blocks: {len(content_list)}")
+
+ # Count different types of content
+ content_types = {}
+ for item in content_list:
+ content_type = item.get('type', 'unknown')
+ content_types[content_type] = content_types.get(content_type, 0) + 1
+
+ print("\nContent Type Distribution:")
+ for content_type, count in content_types.items():
+ print(f"- {content_type}: {count}")
+
+ return content_list, md_content
+
+ except Exception as e:
+ print(f"Error parsing document: {str(e)}")
+ return None, None
+
+def main():
+ """Main function to run the example"""
+ parser = argparse.ArgumentParser(description='MinerU Parser Example')
+ parser.add_argument('file_path', help='Path to the document to parse')
+ parser.add_argument('--output', '-o', help='Output directory path')
+ parser.add_argument('--method', '-m',
+ choices=['auto', 'ocr', 'txt'],
+ default='auto',
+ help='Parsing method (auto, ocr, txt)')
+ parser.add_argument('--stats', action='store_true',
+ help='Display content statistics')
+
+ args = parser.parse_args()
+
+ # Create output directory if specified
+ if args.output:
+ os.makedirs(args.output, exist_ok=True)
+
+ # Parse document
+ content_list, md_content = parse_document(
+ args.file_path,
+ args.output,
+ args.method,
+ args.stats
+ )
+
+if __name__ == '__main__':
+ main()
\ No newline at end of file
diff --git a/examples/raganything_example.py b/examples/raganything_example.py
new file mode 100644
index 00000000..8f56c81d
--- /dev/null
+++ b/examples/raganything_example.py
@@ -0,0 +1,129 @@
+#!/usr/bin/env python
+"""
+Example script demonstrating the integration of MinerU parser with RAGAnything
+
+This example shows how to:
+1. Process parsed documents with RAGAnything
+2. Perform multimodal queries on the processed documents
+3. Handle different types of content (text, images, tables)
+"""
+
+import os
+import argparse
+import asyncio
+from pathlib import Path
+from lightrag.mineru_parser import MineruParser
+from lightrag.llm.openai import openai_complete_if_cache, openai_embed
+from lightrag.raganything import RAGAnything
+
+async def process_with_rag(file_path: str, output_dir: str, api_key: str, base_url: str = None, working_dir: str = None):
+ """
+ Process document with RAGAnything
+
+ Args:
+ file_path: Path to the document
+ output_dir: Output directory for RAG results
+ api_key: OpenAI API key
+ base_url: Optional base URL for API
+ """
+ try:
+ # Initialize RAGAnything
+ rag = RAGAnything(
+ working_dir=working_dir,
+ llm_model_func=lambda prompt, system_prompt=None, history_messages=[], **kwargs: openai_complete_if_cache(
+ "gpt-4o-mini",
+ prompt,
+ system_prompt=system_prompt,
+ history_messages=history_messages,
+ api_key=api_key,
+ base_url=base_url,
+ **kwargs,
+ ),
+ vision_model_func=lambda prompt, system_prompt=None, history_messages=[], image_data=None, **kwargs: openai_complete_if_cache(
+ "gpt-4o",
+ "",
+ system_prompt=None,
+ history_messages=[],
+ messages=[
+ {"role": "system", "content": system_prompt} if system_prompt else None,
+ {"role": "user", "content": [
+ {"type": "text", "text": prompt},
+ {
+ "type": "image_url",
+ "image_url": {
+ "url": f"data:image/jpeg;base64,{image_data}"
+ }
+ }
+ ]} if image_data else {"role": "user", "content": prompt}
+ ],
+ api_key=api_key,
+ base_url=base_url,
+ **kwargs,
+ ) if image_data else openai_complete_if_cache(
+ "gpt-4o-mini",
+ prompt,
+ system_prompt=system_prompt,
+ history_messages=history_messages,
+ api_key=api_key,
+ base_url=base_url,
+ **kwargs,
+ ),
+ embedding_func=lambda texts: openai_embed(
+ texts,
+ model="text-embedding-3-large",
+ api_key=api_key,
+ base_url=base_url,
+ ),
+ embedding_dim=3072,
+ max_token_size=8192
+ )
+
+ # Process document
+ await rag.process_document_complete(
+ file_path=file_path,
+ output_dir=output_dir,
+ parse_method="auto"
+ )
+
+ # Example queries
+ queries = [
+ "What is the main content of the document?",
+ "Describe the images and figures in the document",
+ "Tell me about the experimental results and data tables"
+ ]
+
+ print("\nQuerying processed document:")
+ for query in queries:
+ print(f"\nQuery: {query}")
+ result = await rag.query_with_multimodal(query, mode="hybrid")
+ print(f"Answer: {result}")
+
+ except Exception as e:
+ print(f"Error processing with RAG: {str(e)}")
+
+def main():
+ """Main function to run the example"""
+ parser = argparse.ArgumentParser(description='MinerU RAG Example')
+ parser.add_argument('file_path', help='Path to the document to process')
+ parser.add_argument('--working_dir', '-w', default="./rag_storage", help='Working directory path')
+ parser.add_argument('--output', '-o', default="./output", help='Output directory path')
+ parser.add_argument('--api-key', required=True, help='OpenAI API key for RAG processing')
+ parser.add_argument('--base-url', help='Optional base URL for API')
+
+ args = parser.parse_args()
+
+ # Create output directory if specified
+ if args.output:
+ os.makedirs(args.output, exist_ok=True)
+
+ # Process with RAG
+ asyncio.run(process_with_rag(
+ args.file_path,
+ args.output,
+ args.api_key,
+ args.base_url,
+ args.working_dir
+ ))
+
+if __name__ == '__main__':
+ main()
\ No newline at end of file
diff --git a/lightrag/mineru_parser.py b/lightrag/mineru_parser.py
new file mode 100644
index 00000000..52bc4962
--- /dev/null
+++ b/lightrag/mineru_parser.py
@@ -0,0 +1,454 @@
+# type: ignore
+"""
+MinerU Document Parser Utility
+
+This module provides functionality for parsing PDF, image and office documents using MinerU library,
+and converts the parsing results into markdown and JSON formats
+"""
+
+from __future__ import annotations
+
+__all__ = ["MineruParser"]
+
+import os
+import json
+import argparse
+from pathlib import Path
+from typing import Dict, List, Optional, Union, Tuple, Any, TypeVar, cast, TYPE_CHECKING, ClassVar
+
+# Type stubs for magic_pdf
+FileBasedDataWriter = Any
+FileBasedDataReader = Any
+PymuDocDataset = Any
+InferResult = Any
+PipeResult = Any
+SupportedPdfParseMethod = Any
+doc_analyze = Any
+read_local_office = Any
+read_local_images = Any
+
+if TYPE_CHECKING:
+ from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader
+ from magic_pdf.data.dataset import PymuDocDataset
+ from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
+ from magic_pdf.config.enums import SupportedPdfParseMethod
+ from magic_pdf.data.read_api import read_local_office, read_local_images
+else:
+ # MinerU imports
+ from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader
+ from magic_pdf.data.dataset import PymuDocDataset
+ from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
+ from magic_pdf.config.enums import SupportedPdfParseMethod
+ from magic_pdf.data.read_api import read_local_office, read_local_images
+
+T = TypeVar('T')
+
+class MineruParser:
+ """
+ MinerU document parsing utility class
+
+ Supports parsing PDF, image and office documents (like Word, PPT, etc.),
+ converting the content into structured data and generating markdown and JSON output
+ """
+
+ __slots__: ClassVar[Tuple[str, ...]] = ()
+
+ def __init__(self) -> None:
+ """Initialize MineruParser"""
+ pass
+
+ @staticmethod
+ def safe_write(writer: Any, content: Union[str, bytes, Dict[str, Any], List[Any]], filename: str) -> None:
+ """
+ Safely write content to a file, ensuring the filename is valid
+
+ Args:
+ writer: The writer object to use
+ content: The content to write
+ filename: The filename to write to
+ """
+ # Ensure the filename isn't too long
+ if len(filename) > 200: # Most filesystems have limits around 255 characters
+ # Truncate the filename while keeping the extension
+ base, ext = os.path.splitext(filename)
+ filename = base[:190] + ext # Leave room for the extension and some margin
+
+ # Handle specific content types
+ if isinstance(content, str):
+ # Ensure str content is encoded to bytes if required
+ try:
+ writer.write(content, filename)
+ except TypeError:
+ # If the writer expects bytes, convert string to bytes
+ writer.write(content.encode('utf-8'), filename)
+ else:
+ # For dict/list content, always encode as JSON string first
+ if isinstance(content, (dict, list)):
+ try:
+ writer.write(json.dumps(content, ensure_ascii=False, indent=4), filename)
+ except TypeError:
+ # If the writer expects bytes, convert JSON string to bytes
+ writer.write(json.dumps(content, ensure_ascii=False, indent=4).encode('utf-8'), filename)
+ else:
+ # Regular content (assumed to be bytes or compatible)
+ writer.write(content, filename)
+
+ @staticmethod
+ def parse_pdf(
+ pdf_path: Union[str, Path],
+ output_dir: Optional[str] = None,
+ use_ocr: bool = False
+ ) -> Tuple[List[Dict[str, Any]], str]:
+ """
+ Parse PDF document
+
+ Args:
+ pdf_path: Path to the PDF file
+ output_dir: Output directory path
+ use_ocr: Whether to force OCR parsing
+
+ Returns:
+ Tuple[List[Dict[str, Any]], str]: Tuple containing (content list JSON, Markdown text)
+ """
+ try:
+ # Convert to Path object for easier handling
+ pdf_path = Path(pdf_path)
+ name_without_suff = pdf_path.stem
+
+ # Prepare output directories - ensure file name is in path
+ if output_dir:
+ base_output_dir = Path(output_dir)
+ local_md_dir = base_output_dir / name_without_suff
+ else:
+ local_md_dir = pdf_path.parent / name_without_suff
+
+ local_image_dir = local_md_dir / "images"
+ image_dir = local_image_dir.name
+
+ # Create directories
+ os.makedirs(local_image_dir, exist_ok=True)
+ os.makedirs(local_md_dir, exist_ok=True)
+
+ # Initialize writers and reader
+ image_writer = FileBasedDataWriter(str(local_image_dir)) # type: ignore
+ md_writer = FileBasedDataWriter(str(local_md_dir)) # type: ignore
+ reader = FileBasedDataReader("") # type: ignore
+
+ # Read PDF bytes
+ pdf_bytes = reader.read(str(pdf_path)) # type: ignore
+
+ # Create dataset instance
+ ds = PymuDocDataset(pdf_bytes) # type: ignore
+
+ # Process based on PDF type and user preference
+ if use_ocr or ds.classify() == SupportedPdfParseMethod.OCR: # type: ignore
+ infer_result = ds.apply(doc_analyze, ocr=True) # type: ignore
+ pipe_result = infer_result.pipe_ocr_mode(image_writer) # type: ignore
+ else:
+ infer_result = ds.apply(doc_analyze, ocr=False) # type: ignore
+ pipe_result = infer_result.pipe_txt_mode(image_writer) # type: ignore
+
+ # Draw visualizations
+ try:
+ infer_result.draw_model(os.path.join(local_md_dir, f"{name_without_suff}_model.pdf")) # type: ignore
+ pipe_result.draw_layout(os.path.join(local_md_dir, f"{name_without_suff}_layout.pdf")) # type: ignore
+ pipe_result.draw_span(os.path.join(local_md_dir, f"{name_without_suff}_spans.pdf")) # type: ignore
+ except Exception as e:
+ print(f"Warning: Failed to draw visualizations: {str(e)}")
+
+ # Get data using API methods
+ md_content = pipe_result.get_markdown(image_dir) # type: ignore
+ content_list = pipe_result.get_content_list(image_dir) # type: ignore
+
+ # Save files using dump methods (consistent with API)
+ pipe_result.dump_md(md_writer, f"{name_without_suff}.md", image_dir) # type: ignore
+ pipe_result.dump_content_list(md_writer, f"{name_without_suff}_content_list.json", image_dir) # type: ignore
+ pipe_result.dump_middle_json(md_writer, f"{name_without_suff}_middle.json") # type: ignore
+
+ # Save model result - convert JSON string to bytes before writing
+ model_inference_result = infer_result.get_infer_res() # type: ignore
+ json_str = json.dumps(model_inference_result, ensure_ascii=False, indent=4)
+
+ try:
+ # Try to write to a file manually to avoid FileBasedDataWriter issues
+ model_file_path = os.path.join(local_md_dir, f"{name_without_suff}_model.json")
+ with open(model_file_path, 'w', encoding='utf-8') as f:
+ f.write(json_str)
+ except Exception as e:
+ print(f"Warning: Failed to save model result using file write: {str(e)}")
+ try:
+ # If direct file write fails, try using the writer with bytes encoding
+ md_writer.write(json_str.encode('utf-8'), f"{name_without_suff}_model.json") # type: ignore
+ except Exception as e2:
+ print(f"Warning: Failed to save model result using writer: {str(e2)}")
+
+ return cast(Tuple[List[Dict[str, Any]], str], (content_list, md_content))
+
+ except Exception as e:
+ print(f"Error in parse_pdf: {str(e)}")
+ raise
+
+ @staticmethod
+ def parse_office_doc(
+ doc_path: Union[str, Path],
+ output_dir: Optional[str] = None
+ ) -> Tuple[List[Dict[str, Any]], str]:
+ """
+ Parse office document (Word, PPT, etc.)
+
+ Args:
+ doc_path: Path to the document file
+ output_dir: Output directory path
+
+ Returns:
+ Tuple[List[Dict[str, Any]], str]: Tuple containing (content list JSON, Markdown text)
+ """
+ try:
+ # Convert to Path object for easier handling
+ doc_path = Path(doc_path)
+ name_without_suff = doc_path.stem
+
+ # Prepare output directories - ensure file name is in path
+ if output_dir:
+ base_output_dir = Path(output_dir)
+ local_md_dir = base_output_dir / name_without_suff
+ else:
+ local_md_dir = doc_path.parent / name_without_suff
+
+ local_image_dir = local_md_dir / "images"
+ image_dir = local_image_dir.name
+
+ # Create directories
+ os.makedirs(local_image_dir, exist_ok=True)
+ os.makedirs(local_md_dir, exist_ok=True)
+
+ # Initialize writers
+ image_writer = FileBasedDataWriter(str(local_image_dir)) # type: ignore
+ md_writer = FileBasedDataWriter(str(local_md_dir)) # type: ignore
+
+ # Read office document
+ ds = read_local_office(str(doc_path))[0] # type: ignore
+
+ # Apply chain of operations according to API documentation
+ # This follows the pattern shown in MS-Office example in the API docs
+ ds.apply(doc_analyze, ocr=True)\
+ .pipe_txt_mode(image_writer)\
+ .dump_md(md_writer, f"{name_without_suff}.md", image_dir) # type: ignore
+
+ # Re-execute for getting the content data
+ infer_result = ds.apply(doc_analyze, ocr=True) # type: ignore
+ pipe_result = infer_result.pipe_txt_mode(image_writer) # type: ignore
+
+ # Get data for return values and additional outputs
+ md_content = pipe_result.get_markdown(image_dir) # type: ignore
+ content_list = pipe_result.get_content_list(image_dir) # type: ignore
+
+ # Save additional output files
+ pipe_result.dump_content_list(md_writer, f"{name_without_suff}_content_list.json", image_dir) # type: ignore
+ pipe_result.dump_middle_json(md_writer, f"{name_without_suff}_middle.json") # type: ignore
+
+ # Save model result - convert JSON string to bytes before writing
+ model_inference_result = infer_result.get_infer_res() # type: ignore
+ json_str = json.dumps(model_inference_result, ensure_ascii=False, indent=4)
+
+ try:
+ # Try to write to a file manually to avoid FileBasedDataWriter issues
+ model_file_path = os.path.join(local_md_dir, f"{name_without_suff}_model.json")
+ with open(model_file_path, 'w', encoding='utf-8') as f:
+ f.write(json_str)
+ except Exception as e:
+ print(f"Warning: Failed to save model result using file write: {str(e)}")
+ try:
+ # If direct file write fails, try using the writer with bytes encoding
+ md_writer.write(json_str.encode('utf-8'), f"{name_without_suff}_model.json") # type: ignore
+ except Exception as e2:
+ print(f"Warning: Failed to save model result using writer: {str(e2)}")
+
+ return cast(Tuple[List[Dict[str, Any]], str], (content_list, md_content))
+
+ except Exception as e:
+ print(f"Error in parse_office_doc: {str(e)}")
+ raise
+
+ @staticmethod
+ def parse_image(
+ image_path: Union[str, Path],
+ output_dir: Optional[str] = None
+ ) -> Tuple[List[Dict[str, Any]], str]:
+ """
+ Parse image document
+
+ Args:
+ image_path: Path to the image file
+ output_dir: Output directory path
+
+ Returns:
+ Tuple[List[Dict[str, Any]], str]: Tuple containing (content list JSON, Markdown text)
+ """
+ try:
+ # Convert to Path object for easier handling
+ image_path = Path(image_path)
+ name_without_suff = image_path.stem
+
+ # Prepare output directories - ensure file name is in path
+ if output_dir:
+ base_output_dir = Path(output_dir)
+ local_md_dir = base_output_dir / name_without_suff
+ else:
+ local_md_dir = image_path.parent / name_without_suff
+
+ local_image_dir = local_md_dir / "images"
+ image_dir = local_image_dir.name
+
+ # Create directories
+ os.makedirs(local_image_dir, exist_ok=True)
+ os.makedirs(local_md_dir, exist_ok=True)
+
+ # Initialize writers
+ image_writer = FileBasedDataWriter(str(local_image_dir)) # type: ignore
+ md_writer = FileBasedDataWriter(str(local_md_dir)) # type: ignore
+
+ # Read image
+ ds = read_local_images(str(image_path))[0] # type: ignore
+
+ # Apply chain of operations according to API documentation
+ # This follows the pattern shown in Image example in the API docs
+ ds.apply(doc_analyze, ocr=True)\
+ .pipe_ocr_mode(image_writer)\
+ .dump_md(md_writer, f"{name_without_suff}.md", image_dir) # type: ignore
+
+ # Re-execute for getting the content data
+ infer_result = ds.apply(doc_analyze, ocr=True) # type: ignore
+ pipe_result = infer_result.pipe_ocr_mode(image_writer) # type: ignore
+
+ # Get data for return values and additional outputs
+ md_content = pipe_result.get_markdown(image_dir) # type: ignore
+ content_list = pipe_result.get_content_list(image_dir) # type: ignore
+
+ # Save additional output files
+ pipe_result.dump_content_list(md_writer, f"{name_without_suff}_content_list.json", image_dir) # type: ignore
+ pipe_result.dump_middle_json(md_writer, f"{name_without_suff}_middle.json") # type: ignore
+
+ # Save model result - convert JSON string to bytes before writing
+ model_inference_result = infer_result.get_infer_res() # type: ignore
+ json_str = json.dumps(model_inference_result, ensure_ascii=False, indent=4)
+
+ try:
+ # Try to write to a file manually to avoid FileBasedDataWriter issues
+ model_file_path = os.path.join(local_md_dir, f"{name_without_suff}_model.json")
+ with open(model_file_path, 'w', encoding='utf-8') as f:
+ f.write(json_str)
+ except Exception as e:
+ print(f"Warning: Failed to save model result using file write: {str(e)}")
+ try:
+ # If direct file write fails, try using the writer with bytes encoding
+ md_writer.write(json_str.encode('utf-8'), f"{name_without_suff}_model.json") # type: ignore
+ except Exception as e2:
+ print(f"Warning: Failed to save model result using writer: {str(e2)}")
+
+ return cast(Tuple[List[Dict[str, Any]], str], (content_list, md_content))
+
+ except Exception as e:
+ print(f"Error in parse_image: {str(e)}")
+ raise
+
+ @staticmethod
+ def parse_document(
+ file_path: Union[str, Path],
+ parse_method: str = "auto",
+ output_dir: Optional[str] = None,
+ save_results: bool = True
+ ) -> Tuple[List[Dict[str, Any]], str]:
+ """
+ Parse document using MinerU based on file extension
+
+ Args:
+ file_path: Path to the file to be parsed
+ parse_method: Parsing method, supports "auto", "ocr", "txt", default is "auto"
+ output_dir: Output directory path, if None, use the directory of the input file
+ save_results: Whether to save parsing results to files
+
+ Returns:
+ Tuple[List[Dict[str, Any]], str]: Tuple containing (content list JSON, Markdown text)
+ """
+ # Convert to Path object
+ file_path = Path(file_path)
+ if not file_path.exists():
+ raise FileNotFoundError(f"File does not exist: {file_path}")
+
+ # Get file extension
+ ext = file_path.suffix.lower()
+
+ # Choose appropriate parser based on file type
+ if ext in [".pdf"]:
+ return MineruParser.parse_pdf(
+ file_path,
+ output_dir,
+ use_ocr=(parse_method == "ocr")
+ )
+ elif ext in [".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif"]:
+ return MineruParser.parse_image(
+ file_path,
+ output_dir
+ )
+ elif ext in [".doc", ".docx", ".ppt", ".pptx"]:
+ return MineruParser.parse_office_doc(
+ file_path,
+ output_dir
+ )
+ else:
+ # For unsupported file types, default to PDF parsing
+ print(f"Warning: Unsupported file extension '{ext}', trying generic PDF parser")
+ return MineruParser.parse_pdf(
+ file_path,
+ output_dir,
+ use_ocr=(parse_method == "ocr")
+ )
+
+def main():
+ """
+ Main function to run the MinerU parser from command line
+ """
+ parser = argparse.ArgumentParser(description='Parse documents using MinerU')
+ parser.add_argument('file_path', help='Path to the document to parse')
+ parser.add_argument('--output', '-o', help='Output directory path')
+ parser.add_argument('--method', '-m',
+ choices=['auto', 'ocr', 'txt'],
+ default='auto',
+ help='Parsing method (auto, ocr, txt)')
+ parser.add_argument('--stats', action='store_true',
+ help='Display content statistics')
+
+ args = parser.parse_args()
+
+ try:
+ # Parse the document
+ content_list, md_content = MineruParser.parse_document(
+ file_path=args.file_path,
+ parse_method=args.method,
+ output_dir=args.output
+ )
+
+ # Display statistics if requested
+ if args.stats:
+ print("\nDocument Statistics:")
+ print(f"Total content blocks: {len(content_list)}")
+
+ # Count different types of content
+ content_types = {}
+ for item in content_list:
+ content_type = item.get('type', 'unknown')
+ content_types[content_type] = content_types.get(content_type, 0) + 1
+
+ print("\nContent Type Distribution:")
+ for content_type, count in content_types.items():
+ print(f"- {content_type}: {count}")
+
+ except Exception as e:
+ print(f"Error: {str(e)}")
+ return 1
+
+ return 0
+
+if __name__ == '__main__':
+ exit(main())
diff --git a/lightrag/modalprocessors.py b/lightrag/modalprocessors.py
new file mode 100644
index 00000000..e4cb0a37
--- /dev/null
+++ b/lightrag/modalprocessors.py
@@ -0,0 +1,708 @@
+"""
+Specialized processors for different modalities
+
+Includes:
+- ImageModalProcessor: Specialized processor for image content
+- TableModalProcessor: Specialized processor for table content
+- EquationModalProcessor: Specialized processor for equation content
+- GenericModalProcessor: Processor for other modal content
+"""
+
+import re
+import json
+import time
+import asyncio
+import base64
+from typing import Dict, Any, Tuple, cast
+from pathlib import Path
+
+from lightrag.base import StorageNameSpace
+from lightrag.utils import (
+ logger,
+ compute_mdhash_id,
+)
+from lightrag.lightrag import LightRAG
+from dataclasses import asdict
+from lightrag.kg.shared_storage import get_namespace_data, get_pipeline_status_lock
+
+
+class BaseModalProcessor:
+ """Base class for modal processors"""
+
+ def __init__(self, lightrag: LightRAG, modal_caption_func):
+ """Initialize base processor
+
+ Args:
+ lightrag: LightRAG instance
+ modal_caption_func: Function for generating descriptions
+ """
+ self.lightrag = lightrag
+ self.modal_caption_func = modal_caption_func
+
+ # Use LightRAG's storage instances
+ self.text_chunks_db = lightrag.text_chunks
+ self.chunks_vdb = lightrag.chunks_vdb
+ self.entities_vdb = lightrag.entities_vdb
+ self.relationships_vdb = lightrag.relationships_vdb
+ self.knowledge_graph_inst = lightrag.chunk_entity_relation_graph
+
+ # Use LightRAG's configuration and functions
+ self.embedding_func = lightrag.embedding_func
+ self.llm_model_func = lightrag.llm_model_func
+ self.global_config = asdict(lightrag)
+ self.hashing_kv = lightrag.llm_response_cache
+ self.tokenizer = lightrag.tokenizer
+
+ async def process_multimodal_content(
+ self,
+ modal_content,
+ content_type: str,
+ file_path: str = "manual_creation",
+ entity_name: str = None,
+ ) -> Tuple[str, Dict[str, Any]]:
+ """Process multimodal content"""
+ # Subclasses need to implement specific processing logic
+ raise NotImplementedError("Subclasses must implement this method")
+
+ async def _create_entity_and_chunk(
+ self, modal_chunk: str, entity_info: Dict[str, Any],
+ file_path: str) -> Tuple[str, Dict[str, Any]]:
+ """Create entity and text chunk"""
+ # Create chunk
+ chunk_id = compute_mdhash_id(str(modal_chunk), prefix="chunk-")
+ tokens = len(self.tokenizer.encode(modal_chunk))
+
+ chunk_data = {
+ "tokens": tokens,
+ "content": modal_chunk,
+ "chunk_order_index": 0,
+ "full_doc_id": chunk_id,
+ "file_path": file_path,
+ }
+
+ # Store chunk
+ await self.text_chunks_db.upsert({chunk_id: chunk_data})
+
+ # Create entity node
+ node_data = {
+ "entity_id": entity_info["entity_name"],
+ "entity_type": entity_info["entity_type"],
+ "description": entity_info["summary"],
+ "source_id": chunk_id,
+ "file_path": file_path,
+ "created_at": int(time.time()),
+ }
+
+ await self.knowledge_graph_inst.upsert_node(entity_info["entity_name"],
+ node_data)
+
+ # Insert entity into vector database
+ entity_vdb_data = {
+ compute_mdhash_id(entity_info["entity_name"], prefix="ent-"): {
+ "entity_name": entity_info["entity_name"],
+ "entity_type": entity_info["entity_type"],
+ "content":
+ f"{entity_info['entity_name']}\n{entity_info['summary']}",
+ "source_id": chunk_id,
+ "file_path": file_path,
+ }
+ }
+ await self.entities_vdb.upsert(entity_vdb_data)
+
+ # Process entity and relationship extraction
+ await self._process_chunk_for_extraction(chunk_id,
+ entity_info["entity_name"])
+
+ # Ensure all storage updates are complete
+ await self._insert_done()
+
+ return entity_info["summary"], {
+ "entity_name": entity_info["entity_name"],
+ "entity_type": entity_info["entity_type"],
+ "description": entity_info["summary"],
+ "chunk_id": chunk_id
+ }
+
+ async def _process_chunk_for_extraction(self, chunk_id: str,
+ modal_entity_name: str):
+ """Process chunk for entity and relationship extraction"""
+ chunk_data = await self.text_chunks_db.get_by_id(chunk_id)
+ if not chunk_data:
+ logger.error(f"Chunk {chunk_id} not found")
+ return
+
+ # Create text chunk for vector database
+ chunk_vdb_data = {
+ chunk_id: {
+ "content": chunk_data["content"],
+ "full_doc_id": chunk_id,
+ "tokens": chunk_data["tokens"],
+ "chunk_order_index": chunk_data["chunk_order_index"],
+ "file_path": chunk_data["file_path"],
+ }
+ }
+
+ await self.chunks_vdb.upsert(chunk_vdb_data)
+
+ # Trigger extraction process
+ from lightrag.operate import extract_entities, merge_nodes_and_edges
+
+ pipeline_status = await get_namespace_data("pipeline_status")
+ pipeline_status_lock = get_pipeline_status_lock()
+
+ # Prepare chunk for extraction
+ chunks = {chunk_id: chunk_data}
+
+ # Extract entities and relationships
+ chunk_results = await extract_entities(
+ chunks=chunks,
+ global_config=self.global_config,
+ pipeline_status=pipeline_status,
+ pipeline_status_lock=pipeline_status_lock,
+ llm_response_cache=self.hashing_kv,
+ )
+
+ # Add "belongs_to" relationships for all extracted entities
+ for maybe_nodes, _ in chunk_results:
+ for entity_name in maybe_nodes.keys():
+ if entity_name != modal_entity_name: # Skip self-relationship
+ # Create belongs_to relationship
+ relation_data = {
+ "description":
+ f"Entity {entity_name} belongs to {modal_entity_name}",
+ "keywords":
+ "belongs_to,part_of,contained_in",
+ "source_id":
+ chunk_id,
+ "weight":
+ 10.0,
+ "file_path":
+ chunk_data.get("file_path", "manual_creation"),
+ }
+ await self.knowledge_graph_inst.upsert_edge(
+ entity_name, modal_entity_name, relation_data)
+
+ relation_id = compute_mdhash_id(entity_name +
+ modal_entity_name,
+ prefix="rel-")
+ relation_vdb_data = {
+ relation_id: {
+ "src_id":
+ entity_name,
+ "tgt_id":
+ modal_entity_name,
+ "keywords":
+ relation_data["keywords"],
+ "content":
+ f"{relation_data['keywords']}\t{entity_name}\n{modal_entity_name}\n{relation_data['description']}",
+ "source_id":
+ chunk_id,
+ "file_path":
+ chunk_data.get("file_path", "manual_creation"),
+ }
+ }
+ await self.relationships_vdb.upsert(relation_vdb_data)
+
+ await merge_nodes_and_edges(
+ chunk_results=chunk_results,
+ knowledge_graph_inst=self.knowledge_graph_inst,
+ entity_vdb=self.entities_vdb,
+ relationships_vdb=self.relationships_vdb,
+ global_config=self.global_config,
+ pipeline_status=pipeline_status,
+ pipeline_status_lock=pipeline_status_lock,
+ llm_response_cache=self.hashing_kv,
+ )
+
+ async def _insert_done(self) -> None:
+ await asyncio.gather(*[
+ cast(StorageNameSpace, storage_inst).index_done_callback()
+ for storage_inst in [
+ self.text_chunks_db,
+ self.chunks_vdb,
+ self.entities_vdb,
+ self.relationships_vdb,
+ self.knowledge_graph_inst,
+ ]
+ ])
+
+
+class ImageModalProcessor(BaseModalProcessor):
+ """Processor specialized for image content"""
+
+ def __init__(self, lightrag: LightRAG, modal_caption_func):
+ """Initialize image processor
+
+ Args:
+ lightrag: LightRAG instance
+ modal_caption_func: Function for generating descriptions (supporting image understanding)
+ """
+ super().__init__(lightrag, modal_caption_func)
+
+ def _encode_image_to_base64(self, image_path: str) -> str:
+ """Encode image to base64"""
+ try:
+ with open(image_path, "rb") as image_file:
+ encoded_string = base64.b64encode(
+ image_file.read()).decode('utf-8')
+ return encoded_string
+ except Exception as e:
+ logger.error(f"Failed to encode image {image_path}: {e}")
+ return ""
+
+ async def process_multimodal_content(
+ self,
+ modal_content,
+ content_type: str,
+ file_path: str = "manual_creation",
+ entity_name: str = None,
+ ) -> Tuple[str, Dict[str, Any]]:
+ """Process image content"""
+ try:
+ # Parse image content
+ if isinstance(modal_content, str):
+ try:
+ content_data = json.loads(modal_content)
+ except json.JSONDecodeError:
+ content_data = {"description": modal_content}
+ else:
+ content_data = modal_content
+
+ image_path = content_data.get("img_path")
+ captions = content_data.get("img_caption", [])
+ footnotes = content_data.get("img_footnote", [])
+
+ # Build detailed visual analysis prompt
+ vision_prompt = f"""Please analyze this image in detail and provide a JSON response with the following structure:
+
+ {{
+ "detailed_description": "A comprehensive and detailed visual description of the image following these guidelines:
+ - Describe the overall composition and layout
+ - Identify all objects, people, text, and visual elements
+ - Explain relationships between elements
+ - Note colors, lighting, and visual style
+ - Describe any actions or activities shown
+ - Include technical details if relevant (charts, diagrams, etc.)
+ - Always use specific names instead of pronouns",
+ "entity_info": {{
+ "entity_name": "{entity_name if entity_name else 'unique descriptive name for this image'}",
+ "entity_type": "image",
+ "summary": "concise summary of the image content and its significance (max 100 words)"
+ }}
+ }}
+
+ Additional context:
+ - Image Path: {image_path}
+ - Captions: {captions if captions else 'None'}
+ - Footnotes: {footnotes if footnotes else 'None'}
+
+ Focus on providing accurate, detailed visual analysis that would be useful for knowledge retrieval."""
+
+ # If image path exists, try to encode image
+ image_base64 = ""
+ if image_path and Path(image_path).exists():
+ image_base64 = self._encode_image_to_base64(image_path)
+
+ # Call vision model
+ if image_base64:
+ # Use real image for analysis
+ response = await self.modal_caption_func(
+ vision_prompt,
+ image_data=image_base64,
+ system_prompt=
+ "You are an expert image analyst. Provide detailed, accurate descriptions."
+ )
+ else:
+ # Analyze based on existing text information
+ text_prompt = f"""Based on the following image information, provide analysis:
+
+ Image Path: {image_path}
+ Captions: {captions}
+ Footnotes: {footnotes}
+
+ {vision_prompt}"""
+
+ response = await self.modal_caption_func(
+ text_prompt,
+ system_prompt=
+ "You are an expert image analyst. Provide detailed analysis based on available information."
+ )
+
+ # Parse response
+ enhanced_caption, entity_info = self._parse_response(
+ response, entity_name)
+
+ # Build complete image content
+ modal_chunk = f"""
+ Image Content Analysis:
+ Image Path: {image_path}
+ Captions: {', '.join(captions) if captions else 'None'}
+ Footnotes: {', '.join(footnotes) if footnotes else 'None'}
+
+ Visual Analysis: {enhanced_caption}"""
+
+ return await self._create_entity_and_chunk(modal_chunk,
+ entity_info, file_path)
+
+ except Exception as e:
+ logger.error(f"Error processing image content: {e}")
+ # Fallback processing
+ fallback_entity = {
+ "entity_name": entity_name if entity_name else
+ f"image_{compute_mdhash_id(str(modal_content))}",
+ "entity_type": "image",
+ "summary": f"Image content: {str(modal_content)[:100]}"
+ }
+ return str(modal_content), fallback_entity
+
+ def _parse_response(self,
+ response: str,
+ entity_name: str = None) -> Tuple[str, Dict[str, Any]]:
+ """Parse model response"""
+ try:
+ response_data = json.loads(
+ re.search(r"\{.*\}", response, re.DOTALL).group(0))
+
+ description = response_data.get("detailed_description", "")
+ entity_data = response_data.get("entity_info", {})
+
+ if not description or not entity_data:
+ raise ValueError("Missing required fields in response")
+
+ if not all(key in entity_data
+ for key in ["entity_name", "entity_type", "summary"]):
+ raise ValueError("Missing required fields in entity_info")
+
+ entity_data["entity_name"] = entity_data["entity_name"] + f" ({entity_data['entity_type']})"
+ if entity_name:
+ entity_data["entity_name"] = entity_name
+
+ return description, entity_data
+
+ except (json.JSONDecodeError, AttributeError, ValueError) as e:
+ logger.error(f"Error parsing image analysis response: {e}")
+ fallback_entity = {
+ "entity_name":
+ entity_name
+ if entity_name else f"image_{compute_mdhash_id(response)}",
+ "entity_type":
+ "image",
+ "summary":
+ response[:100] + "..." if len(response) > 100 else response
+ }
+ return response, fallback_entity
+
+
+class TableModalProcessor(BaseModalProcessor):
+ """Processor specialized for table content"""
+
+ async def process_multimodal_content(
+ self,
+ modal_content,
+ content_type: str,
+ file_path: str = "manual_creation",
+ entity_name: str = None,
+ ) -> Tuple[str, Dict[str, Any]]:
+ """Process table content"""
+ # Parse table content
+ if isinstance(modal_content, str):
+ try:
+ content_data = json.loads(modal_content)
+ except json.JSONDecodeError:
+ content_data = {"table_body": modal_content}
+ else:
+ content_data = modal_content
+
+ table_img_path = content_data.get("img_path")
+ table_caption = content_data.get("table_caption", [])
+ table_body = content_data.get("table_body", "")
+ table_footnote = content_data.get("table_footnote", [])
+
+ # Build table analysis prompt
+ table_prompt = f"""Please analyze this table content and provide a JSON response with the following structure:
+
+ {{
+ "detailed_description": "A comprehensive analysis of the table including:
+ - Table structure and organization
+ - Column headers and their meanings
+ - Key data points and patterns
+ - Statistical insights and trends
+ - Relationships between data elements
+ - Significance of the data presented
+ Always use specific names and values instead of general references.",
+ "entity_info": {{
+ "entity_name": "{entity_name if entity_name else 'descriptive name for this table'}",
+ "entity_type": "table",
+ "summary": "concise summary of the table's purpose and key findings (max 100 words)"
+ }}
+ }}
+
+ Table Information:
+ Image Path: {table_img_path}
+ Caption: {table_caption if table_caption else 'None'}
+ Body: {table_body}
+ Footnotes: {table_footnote if table_footnote else 'None'}
+
+ Focus on extracting meaningful insights and relationships from the tabular data."""
+
+ response = await self.modal_caption_func(
+ table_prompt,
+ system_prompt=
+ "You are an expert data analyst. Provide detailed table analysis with specific insights."
+ )
+
+ # Parse response
+ enhanced_caption, entity_info = self._parse_table_response(
+ response, entity_name)
+
+ #TODO: Add Retry Mechanism
+
+ # Build complete table content
+ modal_chunk = f"""Table Analysis:
+ Image Path: {table_img_path}
+ Caption: {', '.join(table_caption) if table_caption else 'None'}
+ Structure: {table_body}
+ Footnotes: {', '.join(table_footnote) if table_footnote else 'None'}
+
+ Analysis: {enhanced_caption}"""
+
+ return await self._create_entity_and_chunk(modal_chunk, entity_info,
+ file_path)
+
+ def _parse_table_response(
+ self,
+ response: str,
+ entity_name: str = None) -> Tuple[str, Dict[str, Any]]:
+ """Parse table analysis response"""
+ try:
+ response_data = json.loads(
+ re.search(r"\{.*\}", response, re.DOTALL).group(0))
+
+ description = response_data.get("detailed_description", "")
+ entity_data = response_data.get("entity_info", {})
+
+ if not description or not entity_data:
+ raise ValueError("Missing required fields in response")
+
+ if not all(key in entity_data
+ for key in ["entity_name", "entity_type", "summary"]):
+ raise ValueError("Missing required fields in entity_info")
+
+ entity_data["entity_name"] = entity_data["entity_name"] + f" ({entity_data['entity_type']})"
+ if entity_name:
+ entity_data["entity_name"] = entity_name
+
+ return description, entity_data
+
+ except (json.JSONDecodeError, AttributeError, ValueError) as e:
+ logger.error(f"Error parsing table analysis response: {e}")
+ fallback_entity = {
+ "entity_name":
+ entity_name
+ if entity_name else f"table_{compute_mdhash_id(response)}",
+ "entity_type":
+ "table",
+ "summary":
+ response[:100] + "..." if len(response) > 100 else response
+ }
+ return response, fallback_entity
+
+
+class EquationModalProcessor(BaseModalProcessor):
+ """Processor specialized for equation content"""
+
+ async def process_multimodal_content(
+ self,
+ modal_content,
+ content_type: str,
+ file_path: str = "manual_creation",
+ entity_name: str = None,
+ ) -> Tuple[str, Dict[str, Any]]:
+ """Process equation content"""
+ # Parse equation content
+ if isinstance(modal_content, str):
+ try:
+ content_data = json.loads(modal_content)
+ except json.JSONDecodeError:
+ content_data = {"equation": modal_content}
+ else:
+ content_data = modal_content
+
+ equation_text = content_data.get("text")
+ equation_format = content_data.get("text_format", "")
+
+ # Build equation analysis prompt
+ equation_prompt = f"""Please analyze this mathematical equation and provide a JSON response with the following structure:
+
+ {{
+ "detailed_description": "A comprehensive analysis of the equation including:
+ - Mathematical meaning and interpretation
+ - Variables and their definitions
+ - Mathematical operations and functions used
+ - Application domain and context
+ - Physical or theoretical significance
+ - Relationship to other mathematical concepts
+ - Practical applications or use cases
+ Always use specific mathematical terminology.",
+ "entity_info": {{
+ "entity_name": "{entity_name if entity_name else 'descriptive name for this equation'}",
+ "entity_type": "equation",
+ "summary": "concise summary of the equation's purpose and significance (max 100 words)"
+ }}
+ }}
+
+ Equation Information:
+ Equation: {equation_text}
+ Format: {equation_format}
+
+ Focus on providing mathematical insights and explaining the equation's significance."""
+
+ response = await self.modal_caption_func(
+ equation_prompt,
+ system_prompt=
+ "You are an expert mathematician. Provide detailed mathematical analysis."
+ )
+
+ # Parse response
+ enhanced_caption, entity_info = self._parse_equation_response(
+ response, entity_name)
+
+ # Build complete equation content
+ modal_chunk = f"""Mathematical Equation Analysis:
+ Equation: {equation_text}
+ Format: {equation_format}
+
+ Mathematical Analysis: {enhanced_caption}"""
+
+ return await self._create_entity_and_chunk(modal_chunk, entity_info,
+ file_path)
+
+ def _parse_equation_response(
+ self,
+ response: str,
+ entity_name: str = None) -> Tuple[str, Dict[str, Any]]:
+ """Parse equation analysis response"""
+ try:
+ response_data = json.loads(
+ re.search(r"\{.*\}", response, re.DOTALL).group(0))
+
+ description = response_data.get("detailed_description", "")
+ entity_data = response_data.get("entity_info", {})
+
+ if not description or not entity_data:
+ raise ValueError("Missing required fields in response")
+
+ if not all(key in entity_data
+ for key in ["entity_name", "entity_type", "summary"]):
+ raise ValueError("Missing required fields in entity_info")
+
+ entity_data["entity_name"] = entity_data["entity_name"] + f" ({entity_data['entity_type']})"
+ if entity_name:
+ entity_data["entity_name"] = entity_name
+
+ return description, entity_data
+
+ except (json.JSONDecodeError, AttributeError, ValueError) as e:
+ logger.error(f"Error parsing equation analysis response: {e}")
+ fallback_entity = {
+ "entity_name":
+ entity_name
+ if entity_name else f"equation_{compute_mdhash_id(response)}",
+ "entity_type":
+ "equation",
+ "summary":
+ response[:100] + "..." if len(response) > 100 else response
+ }
+ return response, fallback_entity
+
+
+class GenericModalProcessor(BaseModalProcessor):
+ """Generic processor for other types of modal content"""
+
+ async def process_multimodal_content(
+ self,
+ modal_content,
+ content_type: str,
+ file_path: str = "manual_creation",
+ entity_name: str = None,
+ ) -> Tuple[str, Dict[str, Any]]:
+ """Process generic modal content"""
+ # Build generic analysis prompt
+ generic_prompt = f"""Please analyze this {content_type} content and provide a JSON response with the following structure:
+
+ {{
+ "detailed_description": "A comprehensive analysis of the content including:
+ - Content structure and organization
+ - Key information and elements
+ - Relationships between components
+ - Context and significance
+ - Relevant details for knowledge retrieval
+ Always use specific terminology appropriate for {content_type} content.",
+ "entity_info": {{
+ "entity_name": "{entity_name if entity_name else f'descriptive name for this {content_type}'}",
+ "entity_type": "{content_type}",
+ "summary": "concise summary of the content's purpose and key points (max 100 words)"
+ }}
+ }}
+
+ Content: {str(modal_content)}
+
+ Focus on extracting meaningful information that would be useful for knowledge retrieval."""
+
+ response = await self.modal_caption_func(
+ generic_prompt,
+ system_prompt=
+ f"You are an expert content analyst specializing in {content_type} content."
+ )
+
+ # Parse response
+ enhanced_caption, entity_info = self._parse_generic_response(
+ response, entity_name, content_type)
+
+ # Build complete content
+ modal_chunk = f"""{content_type.title()} Content Analysis:
+ Content: {str(modal_content)}
+
+ Analysis: {enhanced_caption}"""
+
+ return await self._create_entity_and_chunk(modal_chunk, entity_info,
+ file_path)
+
+ def _parse_generic_response(
+ self,
+ response: str,
+ entity_name: str = None,
+ content_type: str = "content") -> Tuple[str, Dict[str, Any]]:
+ """Parse generic analysis response"""
+ try:
+ response_data = json.loads(
+ re.search(r"\{.*\}", response, re.DOTALL).group(0))
+
+ description = response_data.get("detailed_description", "")
+ entity_data = response_data.get("entity_info", {})
+
+ if not description or not entity_data:
+ raise ValueError("Missing required fields in response")
+
+ if not all(key in entity_data
+ for key in ["entity_name", "entity_type", "summary"]):
+ raise ValueError("Missing required fields in entity_info")
+
+ entity_data["entity_name"] = entity_data["entity_name"] + f" ({entity_data['entity_type']})"
+ if entity_name:
+ entity_data["entity_name"] = entity_name
+
+ return description, entity_data
+
+ except (json.JSONDecodeError, AttributeError, ValueError) as e:
+ logger.error(f"Error parsing generic analysis response: {e}")
+ fallback_entity = {
+ "entity_name":
+ entity_name if entity_name else
+ f"{content_type}_{compute_mdhash_id(response)}",
+ "entity_type":
+ content_type,
+ "summary":
+ response[:100] + "..." if len(response) > 100 else response
+ }
+ return response, fallback_entity
diff --git a/lightrag/raganything.py b/lightrag/raganything.py
new file mode 100644
index 00000000..af98a104
--- /dev/null
+++ b/lightrag/raganything.py
@@ -0,0 +1,632 @@
+"""
+Complete MinerU parsing + multimodal content insertion Pipeline
+
+This script integrates:
+1. MinerU document parsing
+2. Pure text content LightRAG insertion
+3. Specialized processing for multimodal content (using different processors)
+"""
+
+import os
+import asyncio
+import logging
+from pathlib import Path
+from typing import Dict, List, Any, Tuple, Optional, Callable
+import sys
+
+# Add project root directory to Python path
+sys.path.insert(0, str(Path(__file__).parent.parent))
+
+from lightrag import LightRAG, QueryParam
+from lightrag.utils import EmbeddingFunc, setup_logger
+
+# Import parser and multimodal processors
+from lightrag.mineru_parser import MineruParser
+
+# Import specialized processors
+from lightrag.modalprocessors import (
+ ImageModalProcessor,
+ TableModalProcessor,
+ EquationModalProcessor,
+ GenericModalProcessor
+)
+
+
+class RAGAnything:
+ """Multimodal Document Processing Pipeline - Complete document parsing and insertion pipeline"""
+
+ def __init__(
+ self,
+ lightrag: Optional[LightRAG] = None,
+ llm_model_func: Optional[Callable] = None,
+ vision_model_func: Optional[Callable] = None,
+ embedding_func: Optional[Callable] = None,
+ working_dir: str = "./rag_storage",
+ embedding_dim: int = 3072,
+ max_token_size: int = 8192
+ ):
+ """
+ Initialize Multimodal Document Processing Pipeline
+
+ Args:
+ lightrag: Optional pre-initialized LightRAG instance
+ llm_model_func: LLM model function for text analysis
+ vision_model_func: Vision model function for image analysis
+ embedding_func: Embedding function for text vectorization
+ working_dir: Working directory for storage (used when creating new RAG)
+ embedding_dim: Embedding dimension (used when creating new RAG)
+ max_token_size: Maximum token size for embeddings (used when creating new RAG)
+ """
+ self.working_dir = working_dir
+ self.llm_model_func = llm_model_func
+ self.vision_model_func = vision_model_func
+ self.embedding_func = embedding_func
+ self.embedding_dim = embedding_dim
+ self.max_token_size = max_token_size
+
+ # Set up logging
+ setup_logger("RAGAnything")
+ self.logger = logging.getLogger("RAGAnything")
+
+ # Create working directory if needed
+ if not os.path.exists(working_dir):
+ os.makedirs(working_dir)
+
+ # Use provided LightRAG or mark for later initialization
+ self.lightrag = lightrag
+ self.modal_processors = {}
+
+ # If LightRAG is provided, initialize processors immediately
+ if self.lightrag is not None:
+ self._initialize_processors()
+
+ def _initialize_processors(self):
+ """Initialize multimodal processors with appropriate model functions"""
+ if self.lightrag is None:
+ raise ValueError("LightRAG instance must be initialized before creating processors")
+
+ # Create different multimodal processors
+ self.modal_processors = {
+ "image": ImageModalProcessor(
+ lightrag=self.lightrag,
+ modal_caption_func=self.vision_model_func or self.llm_model_func
+ ),
+ "table": TableModalProcessor(
+ lightrag=self.lightrag,
+ modal_caption_func=self.llm_model_func
+ ),
+ "equation": EquationModalProcessor(
+ lightrag=self.lightrag,
+ modal_caption_func=self.llm_model_func
+ ),
+ "generic": GenericModalProcessor(
+ lightrag=self.lightrag,
+ modal_caption_func=self.llm_model_func
+ )
+ }
+
+ self.logger.info("Multimodal processors initialized")
+ self.logger.info(f"Available processors: {list(self.modal_processors.keys())}")
+
+ async def _ensure_lightrag_initialized(self):
+ """Ensure LightRAG instance is initialized, create if necessary"""
+ if self.lightrag is not None:
+ return
+
+ # Validate required functions
+ if self.llm_model_func is None:
+ raise ValueError("llm_model_func must be provided when LightRAG is not pre-initialized")
+ if self.embedding_func is None:
+ raise ValueError("embedding_func must be provided when LightRAG is not pre-initialized")
+
+ from lightrag.kg.shared_storage import initialize_pipeline_status
+
+ # Create LightRAG instance with provided functions
+ self.lightrag = LightRAG(
+ working_dir=self.working_dir,
+ llm_model_func=self.llm_model_func,
+ embedding_func=EmbeddingFunc(
+ embedding_dim=self.embedding_dim,
+ max_token_size=self.max_token_size,
+ func=self.embedding_func,
+ ),
+ )
+
+ await self.lightrag.initialize_storages()
+ await initialize_pipeline_status()
+
+ # Initialize processors after LightRAG is ready
+ self._initialize_processors()
+
+ self.logger.info("LightRAG and multimodal processors initialized")
+
+ def parse_document(
+ self,
+ file_path: str,
+ output_dir: str = "./output",
+ parse_method: str = "auto",
+ display_stats: bool = True
+ ) -> Tuple[List[Dict[str, Any]], str]:
+ """
+ Parse document using MinerU
+
+ Args:
+ file_path: Path to the file to parse
+ output_dir: Output directory
+ parse_method: Parse method ("auto", "ocr", "txt")
+ display_stats: Whether to display content statistics
+
+ Returns:
+ (content_list, md_content): Content list and markdown text
+ """
+ self.logger.info(f"Starting document parsing: {file_path}")
+
+ file_path = Path(file_path)
+ if not file_path.exists():
+ raise FileNotFoundError(f"File not found: {file_path}")
+
+ # Choose appropriate parsing method based on file extension
+ ext = file_path.suffix.lower()
+
+ try:
+ if ext in [".pdf"]:
+ self.logger.info(f"Detected PDF file, using PDF parser (OCR={parse_method == 'ocr'})...")
+ content_list, md_content = MineruParser.parse_pdf(
+ file_path,
+ output_dir,
+ use_ocr=(parse_method == "ocr")
+ )
+ elif ext in [".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif"]:
+ self.logger.info("Detected image file, using image parser...")
+ content_list, md_content = MineruParser.parse_image(
+ file_path,
+ output_dir
+ )
+ elif ext in [".doc", ".docx", ".ppt", ".pptx"]:
+ self.logger.info("Detected Office document, using Office parser...")
+ content_list, md_content = MineruParser.parse_office_doc(
+ file_path,
+ output_dir
+ )
+ else:
+ # For other or unknown formats, use generic parser
+ self.logger.info(f"Using generic parser for {ext} file (method={parse_method})...")
+ content_list, md_content = MineruParser.parse_document(
+ file_path,
+ parse_method=parse_method,
+ output_dir=output_dir
+ )
+
+ except Exception as e:
+ self.logger.error(f"Error during parsing with specific parser: {str(e)}")
+ self.logger.warning("Falling back to generic parser...")
+ # If specific parser fails, fall back to generic parser
+ content_list, md_content = MineruParser.parse_document(
+ file_path,
+ parse_method=parse_method,
+ output_dir=output_dir
+ )
+
+ self.logger.info(f"Parsing complete! Extracted {len(content_list)} content blocks")
+ self.logger.info(f"Markdown text length: {len(md_content)} characters")
+
+ # Display content statistics if requested
+ if display_stats:
+ self.logger.info("\nContent Information:")
+ self.logger.info(f"* Total blocks in content_list: {len(content_list)}")
+ self.logger.info(f"* Markdown content length: {len(md_content)} characters")
+
+ # Count elements by type
+ block_types: Dict[str, int] = {}
+ for block in content_list:
+ if isinstance(block, dict):
+ block_type = block.get("type", "unknown")
+ if isinstance(block_type, str):
+ block_types[block_type] = block_types.get(block_type, 0) + 1
+
+ self.logger.info("* Content block types:")
+ for block_type, count in block_types.items():
+ self.logger.info(f" - {block_type}: {count}")
+
+ return content_list, md_content
+
+ def _separate_content(self, content_list: List[Dict[str, Any]]) -> Tuple[str, List[Dict[str, Any]]]:
+ """
+ Separate text content and multimodal content
+
+ Args:
+ content_list: Content list from MinerU parsing
+
+ Returns:
+ (text_content, multimodal_items): Pure text content and multimodal items list
+ """
+ text_parts = []
+ multimodal_items = []
+
+ for item in content_list:
+ content_type = item.get("type", "text")
+
+ if content_type == "text":
+ # Text content
+ text = item.get("text", "")
+ if text.strip():
+ text_parts.append(text)
+ else:
+ # Multimodal content (image, table, equation, etc.)
+ multimodal_items.append(item)
+
+ # Merge all text content
+ text_content = "\n\n".join(text_parts)
+
+ self.logger.info(f"Content separation complete:")
+ self.logger.info(f" - Text content length: {len(text_content)} characters")
+ self.logger.info(f" - Multimodal items count: {len(multimodal_items)}")
+
+ # Count multimodal types
+ modal_types = {}
+ for item in multimodal_items:
+ modal_type = item.get("type", "unknown")
+ modal_types[modal_type] = modal_types.get(modal_type, 0) + 1
+
+ if modal_types:
+ self.logger.info(f" - Multimodal type distribution: {modal_types}")
+
+ return text_content, multimodal_items
+
+ async def _insert_text_content(
+ self,
+ input: str | list[str],
+ split_by_character: str | None = None,
+ split_by_character_only: bool = False,
+ ids: str | list[str] | None = None,
+ file_paths: str | list[str] | None = None,
+ ):
+ """
+ Insert pure text content into LightRAG
+
+ Args:
+ input: Single document string or list of document strings
+ split_by_character: if split_by_character is not None, split the string by character, if chunk longer than
+ chunk_token_size, it will be split again by token size.
+ split_by_character_only: if split_by_character_only is True, split the string by character only, when
+ split_by_character is None, this parameter is ignored.
+ ids: single string of the document ID or list of unique document IDs, if not provided, MD5 hash IDs will be generated
+ file_paths: single string of the file path or list of file paths, used for citation
+ """
+ self.logger.info("Starting text content insertion into LightRAG...")
+
+ # Use LightRAG's insert method with all parameters
+ await self.lightrag.ainsert(
+ input=input,
+ file_paths=file_paths,
+ split_by_character=split_by_character,
+ split_by_character_only=split_by_character_only,
+ ids=ids
+ )
+
+ self.logger.info("Text content insertion complete")
+
+ async def _process_multimodal_content(self, multimodal_items: List[Dict[str, Any]], file_path: str):
+ """
+ Process multimodal content (using specialized processors)
+
+ Args:
+ multimodal_items: List of multimodal items
+ file_path: File path (for reference)
+ """
+ if not multimodal_items:
+ self.logger.debug("No multimodal content to process")
+ return
+
+ self.logger.info("Starting multimodal content processing...")
+
+ file_name = os.path.basename(file_path)
+
+ for i, item in enumerate(multimodal_items):
+ try:
+ content_type = item.get("type", "unknown")
+ self.logger.info(f"Processing item {i+1}/{len(multimodal_items)}: {content_type} content")
+
+ # Select appropriate processor
+ processor = self._get_processor_for_type(content_type)
+
+ if processor:
+ enhanced_caption, entity_info = await processor.process_multimodal_content(
+ modal_content=item,
+ content_type=content_type,
+ file_path=file_name
+ )
+ self.logger.info(f"{content_type} processing complete: {entity_info.get('entity_name', 'Unknown')}")
+ else:
+ self.logger.warning(f"No suitable processor found for {content_type} type content")
+
+ except Exception as e:
+ self.logger.error(f"Error processing multimodal content: {str(e)}")
+ self.logger.debug("Exception details:", exc_info=True)
+ continue
+
+ self.logger.info("Multimodal content processing complete")
+
+ def _get_processor_for_type(self, content_type: str):
+ """
+ Get appropriate processor based on content type
+
+ Args:
+ content_type: Content type
+
+ Returns:
+ Corresponding processor instance
+ """
+ # Direct mapping to corresponding processor
+ if content_type == "image":
+ return self.modal_processors.get("image")
+ elif content_type == "table":
+ return self.modal_processors.get("table")
+ elif content_type == "equation":
+ return self.modal_processors.get("equation")
+ else:
+ # For other types, use generic processor
+ return self.modal_processors.get("generic")
+
+ async def process_document_complete(
+ self,
+ file_path: str,
+ output_dir: str = "./output",
+ parse_method: str = "auto",
+ display_stats: bool = True,
+ split_by_character: str | None = None,
+ split_by_character_only: bool = False,
+ doc_id: str | None = None
+ ):
+ """
+ Complete document processing workflow
+
+ Args:
+ file_path: Path to the file to process
+ output_dir: MinerU output directory
+ parse_method: Parse method
+ display_stats: Whether to display content statistics
+ split_by_character: Optional character to split the text by
+ split_by_character_only: If True, split only by the specified character
+ doc_id: Optional document ID, if not provided MD5 hash will be generated
+ """
+ # Ensure LightRAG is initialized
+ await self._ensure_lightrag_initialized()
+
+ self.logger.info(f"Starting complete document processing: {file_path}")
+
+ # Step 1: Parse document using MinerU
+ content_list, md_content = self.parse_document(
+ file_path,
+ output_dir,
+ parse_method,
+ display_stats
+ )
+
+ # Step 2: Separate text and multimodal content
+ text_content, multimodal_items = self._separate_content(content_list)
+
+ # Step 3: Insert pure text content with all parameters
+ if text_content.strip():
+ file_name = os.path.basename(file_path)
+ await self._insert_text_content(
+ text_content,
+ file_paths=file_name,
+ split_by_character=split_by_character,
+ split_by_character_only=split_by_character_only,
+ ids=doc_id
+ )
+
+ # Step 4: Process multimodal content (using specialized processors)
+ if multimodal_items:
+ await self._process_multimodal_content(multimodal_items, file_path)
+
+ self.logger.info(f"Document {file_path} processing complete!")
+
+ async def process_folder_complete(
+ self,
+ folder_path: str,
+ output_dir: str = "./output",
+ parse_method: str = "auto",
+ display_stats: bool = False,
+ split_by_character: str | None = None,
+ split_by_character_only: bool = False,
+ file_extensions: Optional[List[str]] = None,
+ recursive: bool = True,
+ max_workers: int = 1
+ ):
+ """
+ Process all files in a folder in batch
+
+ Args:
+ folder_path: Path to the folder to process
+ output_dir: MinerU output directory
+ parse_method: Parse method
+ display_stats: Whether to display content statistics for each file (recommended False for batch processing)
+ split_by_character: Optional character to split text by
+ split_by_character_only: If True, split only by the specified character
+ file_extensions: List of file extensions to process, e.g. [".pdf", ".docx"]. If None, process all supported formats
+ recursive: Whether to recursively process subfolders
+ max_workers: Maximum number of concurrent workers
+ """
+ # Ensure LightRAG is initialized
+ await self._ensure_lightrag_initialized()
+
+ folder_path = Path(folder_path)
+ if not folder_path.exists() or not folder_path.is_dir():
+ raise ValueError(f"Folder does not exist or is not a valid directory: {folder_path}")
+
+ # Supported file formats
+ supported_extensions = {
+ ".pdf", ".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif",
+ ".doc", ".docx", ".ppt", ".pptx", ".txt", ".md"
+ }
+
+ # Use specified extensions or all supported formats
+ if file_extensions:
+ target_extensions = set(ext.lower() for ext in file_extensions)
+ # Validate if all are supported formats
+ unsupported = target_extensions - supported_extensions
+ if unsupported:
+ self.logger.warning(f"The following file formats may not be fully supported: {unsupported}")
+ else:
+ target_extensions = supported_extensions
+
+ # Collect all files to process
+ files_to_process = []
+
+ if recursive:
+ # Recursively traverse all subfolders
+ for file_path in folder_path.rglob("*"):
+ if file_path.is_file() and file_path.suffix.lower() in target_extensions:
+ files_to_process.append(file_path)
+ else:
+ # Process only current folder
+ for file_path in folder_path.glob("*"):
+ if file_path.is_file() and file_path.suffix.lower() in target_extensions:
+ files_to_process.append(file_path)
+
+ if not files_to_process:
+ self.logger.info(f"No files to process found in {folder_path}")
+ return
+
+ self.logger.info(f"Found {len(files_to_process)} files to process")
+ self.logger.info(f"File type distribution:")
+
+ # Count file types
+ file_type_count = {}
+ for file_path in files_to_process:
+ ext = file_path.suffix.lower()
+ file_type_count[ext] = file_type_count.get(ext, 0) + 1
+
+ for ext, count in sorted(file_type_count.items()):
+ self.logger.info(f" {ext}: {count} files")
+
+ # Create progress tracking
+ processed_count = 0
+ failed_files = []
+
+ # Use semaphore to control concurrency
+ semaphore = asyncio.Semaphore(max_workers)
+
+ async def process_single_file(file_path: Path, index: int) -> None:
+ """Process a single file"""
+ async with semaphore:
+ nonlocal processed_count
+ try:
+ self.logger.info(f"[{index}/{len(files_to_process)}] Processing: {file_path}")
+
+ # Create separate output directory for each file
+ file_output_dir = Path(output_dir) / file_path.stem
+ file_output_dir.mkdir(parents=True, exist_ok=True)
+
+ # Process file
+ await self.process_document_complete(
+ file_path=str(file_path),
+ output_dir=str(file_output_dir),
+ parse_method=parse_method,
+ display_stats=display_stats,
+ split_by_character=split_by_character,
+ split_by_character_only=split_by_character_only
+ )
+
+ processed_count += 1
+ self.logger.info(f"[{index}/{len(files_to_process)}] Successfully processed: {file_path}")
+
+ except Exception as e:
+ self.logger.error(f"[{index}/{len(files_to_process)}] Failed to process: {file_path}")
+ self.logger.error(f"Error: {str(e)}")
+ failed_files.append((file_path, str(e)))
+
+ # Create all processing tasks
+ tasks = []
+ for index, file_path in enumerate(files_to_process, 1):
+ task = process_single_file(file_path, index)
+ tasks.append(task)
+
+ # Wait for all tasks to complete
+ await asyncio.gather(*tasks, return_exceptions=True)
+
+ # Output processing statistics
+ self.logger.info("\n===== Batch Processing Complete =====")
+ self.logger.info(f"Total files: {len(files_to_process)}")
+ self.logger.info(f"Successfully processed: {processed_count}")
+ self.logger.info(f"Failed: {len(failed_files)}")
+
+ if failed_files:
+ self.logger.info("\nFailed files:")
+ for file_path, error in failed_files:
+ self.logger.info(f" - {file_path}: {error}")
+
+ return {
+ "total": len(files_to_process),
+ "success": processed_count,
+ "failed": len(failed_files),
+ "failed_files": failed_files
+ }
+
+ async def query_with_multimodal(
+ self,
+ query: str,
+ mode: str = "hybrid"
+ ) -> str:
+ """
+ Query with multimodal content support
+
+ Args:
+ query: Query content
+ mode: Query mode
+
+ Returns:
+ Query result
+ """
+ if self.lightrag is None:
+ raise ValueError(
+ "No LightRAG instance available. "
+ "Please either:\n"
+ "1. Provide a pre-initialized LightRAG instance when creating RAGAnything, or\n"
+ "2. Process documents first using process_document_complete() or process_folder_complete() "
+ "to create and populate the LightRAG instance."
+ )
+
+ result = await self.lightrag.aquery(
+ query,
+ param=QueryParam(mode=mode)
+ )
+
+ return result
+
+ def get_processor_info(self) -> Dict[str, Any]:
+ """Get processor information"""
+ if not self.modal_processors:
+ return {"status": "Not initialized"}
+
+ info = {
+ "status": "Initialized",
+ "processors": {},
+ "models": {
+ "llm_model": "External function" if self.llm_model_func else "Not provided",
+ "vision_model": "External function" if self.vision_model_func else "Not provided",
+ "embedding_model": "External function" if self.embedding_func else "Not provided"
+ }
+ }
+
+ for proc_type, processor in self.modal_processors.items():
+ info["processors"][proc_type] = {
+ "class": processor.__class__.__name__,
+ "supports": self._get_processor_supports(proc_type)
+ }
+
+ return info
+
+ def _get_processor_supports(self, proc_type: str) -> List[str]:
+ """Get processor supported features"""
+ supports_map = {
+ "image": ["Image content analysis", "Visual understanding", "Image description generation", "Image entity extraction"],
+ "table": ["Table structure analysis", "Data statistics", "Trend identification", "Table entity extraction"],
+ "equation": ["Mathematical formula parsing", "Variable identification", "Formula meaning explanation", "Formula entity extraction"],
+ "generic": ["General content analysis", "Structured processing", "Entity extraction"]
+ }
+ return supports_map.get(proc_type, ["Basic processing"])
+
+