docs: Add comprehensive document processing analysis

Add detailed analysis documentation for RAGFlow's document processing pipeline:
- README.md: Overview and architecture diagram
- task_executor_analysis.md: Task execution pipeline details
- pdf_parsing.md: PDF parsing with layout analysis
- ocr_pipeline.md: PaddleOCR integration and text detection
- layout_detection.md: Detectron2 layout recognition
- table_extraction.md: Table structure recognition (TSR)
- file_type_handlers.md: Handlers for all supported file types

These documents explain the document processing flow for newcomers
to understand how RAGFlow handles various file formats.
This commit is contained in:
Claude 2025-12-01 09:47:37 +00:00
parent 3b7123f176
commit 0125ae5e84
No known key found for this signature in database
7 changed files with 3532 additions and 0 deletions

View file

@ -0,0 +1,210 @@
# 05-DOCUMENT-PROCESSING - Document Parsing Pipeline
## Tong Quan
Document Processing pipeline chuyển đổi raw documents thành searchable chunks với layout analysis, OCR, và intelligent chunking.
## Kien Truc Document Processing
```
┌─────────────────────────────────────────────────────────────────┐
│ DOCUMENT PROCESSING PIPELINE │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ File Upload │────▶│ Task Creation │────▶│ Task Queue │
│ (MinIO) │ │ (MySQL) │ │ (Redis) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ TASK EXECUTOR │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 1. Download file from MinIO │ │
│ │ 2. Select parser based on file type │ │
│ │ 3. Execute parsing pipeline │ │
│ │ 4. Generate embeddings │ │
│ │ 5. Index in Elasticsearch │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────┼─────────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ PDF Parser │ │ Office Parser │ │ Text Parser │
│ │ │ │ │ │
│ - Layout detect │ │ - DOCX/XLSX │ │ - TXT/MD/CSV │
│ - OCR │ │ - Table extract │ │ - Direct chunk │
│ - Table struct │ │ - Image embed │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
```
## Cau Truc Thu Muc
```
/rag/
├── svr/
│ └── task_executor.py # Main task executor ⭐
├── app/
│ ├── naive.py # Document parsing logic
│ ├── paper.py # Academic paper parser
│ ├── qa.py # Q&A document parser
│ └── table.py # Structured table parser
├── flow/
│ ├── parser/ # Document parsers
│ ├── splitter/ # Chunking logic
│ └── tokenizer/ # Tokenization
└── nlp/
└── __init__.py # naive_merge() chunking
/deepdoc/
├── parser/
│ └── pdf_parser.py # RAGFlow PDF parser ⭐
├── vision/
│ ├── ocr.py # PaddleOCR integration
│ ├── layout_recognizer.py # Detectron2 layout
│ └── table_structure_recognizer.py # TSR
└── images/
└── ... # Image processing
```
## Files Trong Module Nay
| File | Mo Ta |
|------|-------|
| [task_executor_analysis.md](./task_executor_analysis.md) | Task execution pipeline |
| [pdf_parsing.md](./pdf_parsing.md) | PDF parsing với layout analysis |
| [ocr_pipeline.md](./ocr_pipeline.md) | OCR với PaddleOCR |
| [layout_detection.md](./layout_detection.md) | Detectron2 layout recognition |
| [table_extraction.md](./table_extraction.md) | Table structure recognition |
| [file_type_handlers.md](./file_type_handlers.md) | Handler cho từng file type |
## Processing Flow
```
┌─────────────────────────────────────────────────────────────────┐
│ PDF PROCESSING PIPELINE │
└─────────────────────────────────────────────────────────────────┘
PDF Binary Input
┌─────────────────────────────────────────────────────────────────┐
│ 1. IMAGE EXTRACTION (0-40%) │
│ pdfplumber → PIL Images (3x zoom) │
└──────────────────────────┬──────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 2. OCR DETECTION (40-63%) │
│ PaddleOCR → Bounding boxes + Text │
└──────────────────────────┬──────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 3. LAYOUT RECOGNITION (63-83%) │
│ Detectron2 → Layout types (Text, Title, Table, Figure) │
└──────────────────────────┬──────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 4. TABLE STRUCTURE (TSR) │
│ TableTransformer → Rows, Columns, Cells │
└──────────────────────────┬──────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 5. TEXT MERGING │
│ ML-based vertical merge (XGBoost) │
│ Column detection (KMeans) │
└──────────────────────────┬──────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 6. CHUNKING │
│ naive_merge() → Token-based chunks │
└──────────────────────────┬──────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 7. EMBEDDING + INDEXING │
│ Vector generation → Elasticsearch │
└─────────────────────────────────────────────────────────────────┘
```
## Supported File Types
| Category | Extensions | Parser |
|----------|------------|--------|
| **PDF** | .pdf | RAGFlowPdfParser, PlainParser, VisionParser |
| **Office** | .docx, .xlsx, .pptx | python-docx, openpyxl |
| **Text** | .txt, .md, .csv | Direct reading |
| **Images** | .jpg, .png, .tiff | Vision LLM |
| **Email** | .eml | Email parser |
| **Web** | .html | Beautiful Soup |
## Layout Types Detected
| Type | Description |
|------|-------------|
| Text | Regular body text |
| Title | Section/document titles |
| Figure | Images and diagrams |
| Figure caption | Figure descriptions |
| Table | Data tables |
| Table caption | Table descriptions |
| Header | Page headers |
| Footer | Page footers |
| Reference | Bibliography |
| Equation | Mathematical equations |
## Key Algorithms
### Text Merging (XGBoost)
```
Features:
- Y-distance normalized by char height
- Same layout number
- Ending punctuation patterns
- Beginning character patterns
- Chinese numbering patterns
Output: Merge probability → threshold decision
```
### Column Detection (KMeans)
```
Input: X-coordinates of text boxes
Output: Optimal column assignments
Algorithm:
1. For k = 1 to max_columns:
- Fit KMeans(k)
- Calculate silhouette_score
2. Select k with best score
3. Assign boxes to columns
```
## Configuration
```python
parser_config = {
"chunk_token_num": 512, # Tokens per chunk
"delimiter": "\n。", # Chunk boundaries
"layout_recognize": "DeepDOC", # Layout method
"task_page_size": 12, # Pages per task
}
# Task executor config
MAX_CONCURRENT_TASKS = 5
EMBEDDING_BATCH_SIZE = 16
DOC_BULK_SIZE = 64
```
## Related Files
- `/rag/svr/task_executor.py` - Main executor
- `/deepdoc/parser/pdf_parser.py` - PDF parsing
- `/deepdoc/vision/ocr.py` - OCR engine
- `/rag/nlp/__init__.py` - Chunking algorithms

View file

@ -0,0 +1,762 @@
# File Type Handlers
## Tong Quan
RAGFlow ho tro nhieu file formats khac nhau, moi format co parser rieng de extract noi dung va metadata. Document duoc xu ly qua unified chunk() function trong `/rag/app/naive.py`, function nay se chon parser phu hop dua tren file extension.
## File Location
```
/deepdoc/parser/ # Individual parsers
/rag/app/naive.py # Main chunk() function
```
## Supported File Types
```
┌─────────────────────────────────────────────────────────────────┐
│ FILE TYPE HANDLERS │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ PDF │ │ Office │ │ Text │ │
│ │ .pdf │ │ .docx .xlsx │ │ .txt .md │ │
│ │ .ppt │ │ .pptx .doc │ │ .csv .json │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ DeepDOC │ │ python-docx │ │ Direct │ │
│ │ MinerU │ │ openpyxl │ │ Read │ │
│ │ Docling │ │ python-pptx │ │ │ │
│ │ TCADP │ │ tika │ │ │ │
│ │ VisionLLM │ │ │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Web │ │ Image │ │
│ │ .html .htm │ │ .jpg .png │ │
│ │ │ │ .tiff │ │
│ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │BeautifulSoup│ │ Vision LLM │ │
│ │ html5lib │ │ │ │
│ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
```
## Main Entry Point
```python
# /rag/app/naive.py
def chunk(filename, binary=None, from_page=0, to_page=100000,
lang="Chinese", callback=None, **kwargs):
"""
Main chunking function for all file types.
Args:
filename: File name with extension
binary: File binary content
from_page: Start page (for paginated docs)
to_page: End page
lang: Language hint (Chinese/English)
callback: Progress callback(progress, message)
**kwargs: Additional options (parser_config, tenant_id, etc.)
Returns:
List of tokenized chunks ready for indexing
"""
parser_config = kwargs.get("parser_config", {
"chunk_token_num": 512,
"delimiter": "\n!?。;!?",
"layout_recognize": "DeepDOC"
})
# Route to appropriate handler based on extension
if re.search(r"\.pdf$", filename, re.IGNORECASE):
return _handle_pdf(...)
elif re.search(r"\.docx$", filename, re.IGNORECASE):
return _handle_docx(...)
elif re.search(r"\.(csv|xlsx?)$", filename, re.IGNORECASE):
return _handle_excel(...)
# ... more handlers
```
## PDF Handler
### Parser Selection
```python
# PDF Parser options in /rag/app/naive.py
PARSERS = {
"deepdoc": by_deepdoc, # Default: OCR + Layout + TSR
"mineru": by_mineru, # MinerU external parser
"docling": by_docling, # Docling external parser
"tcadp": by_tcadp, # Tencent Cloud ADP
"plaintext": by_plaintext, # Plain text or Vision LLM
}
def by_deepdoc(filename, binary=None, from_page=0, to_page=100000,
lang="Chinese", callback=None, **kwargs):
"""
DeepDOC parser: RAGFlow's native PDF parser.
Features:
- OCR with PaddleOCR
- Layout detection with Detectron2
- Table structure recognition
- Figure extraction with Vision LLM
"""
pdf_parser = PdfParser()
sections, tables = pdf_parser(
filename if not binary else binary,
from_page=from_page,
to_page=to_page,
callback=callback
)
# Optional: Vision LLM for figure understanding
tables = vision_figure_parser_pdf_wrapper(
tbls=tables,
callback=callback,
**kwargs
)
return sections, tables, pdf_parser
def by_plaintext(filename, binary=None, from_page=0, to_page=100000,
callback=None, **kwargs):
"""
Plain text or Vision LLM parser.
Options:
- "Plain Text": Extract text only, no layout
- Vision LLM: Use multimodal LLM for understanding
"""
if kwargs.get("layout_recognizer", "") == "Plain Text":
pdf_parser = PlainParser()
else:
vision_model = LLMBundle(
kwargs["tenant_id"],
LLMType.IMAGE2TEXT,
llm_name=kwargs.get("layout_recognizer", "")
)
pdf_parser = VisionParser(vision_model=vision_model, **kwargs)
sections, tables = pdf_parser(
filename if not binary else binary,
from_page=from_page,
to_page=to_page,
callback=callback
)
return sections, tables, pdf_parser
```
### PdfParser Class
```python
# /deepdoc/parser/pdf_parser.py
class RAGFlowPdfParser:
"""
Main PDF parser with full document understanding.
Pipeline:
1. Image extraction (pdfplumber)
2. OCR (PaddleOCR)
3. Layout detection (Detectron2)
4. Table structure recognition
5. Text merging (XGBoost)
6. Table/figure extraction
"""
def __call__(self, filename, from_page=0, to_page=100000,
zoomin=3, callback=None):
# 1. Extract page images
self.__images__(filename, zoomin, from_page, to_page, callback)
# 2. Run OCR
self.__ocr(callback, 0.4, 0.63)
# 3. Layout detection
self._layouts_rec(zoomin)
# 4. Table structure
self._table_transformer_job(zoomin)
# 5. Text merging
self._text_merge(zoomin=zoomin)
self._naive_vertical_merge()
self._concat_downward()
self._final_reading_order_merge()
# 6. Extract tables/figures
tbls = self._extract_table_figure(True, zoomin, True, True)
return [(b["text"], self._line_tag(b, zoomin))
for b in self.boxes], tbls
```
## DOCX Handler
```python
# /deepdoc/parser/docx_parser.py
class RAGFlowDocxParser:
"""
Microsoft Word (.docx) parser.
Features:
- Paragraph extraction with styles
- Table extraction with structure
- Embedded image extraction
- Heading hierarchy for table context
"""
def __call__(self, fnm, from_page=0, to_page=100000):
self.doc = Document(fnm) if isinstance(fnm, str) \
else Document(BytesIO(fnm))
pn = 0 # Current page
lines = []
# Extract paragraphs
for p in self.doc.paragraphs:
if pn > to_page:
break
if from_page <= pn < to_page:
if p.text.strip():
# Get embedded images
current_image = self.get_picture(self.doc, p)
lines.append((
self._clean(p.text),
[current_image],
p.style.name if p.style else ""
))
# Track page breaks
for run in p.runs:
if 'lastRenderedPageBreak' in run._element.xml:
pn += 1
# Extract tables with context
tbls = []
for i, tb in enumerate(self.doc.tables):
title = self._get_nearest_title(i, fnm)
html = self._table_to_html(tb, title)
tbls.append(((None, html), ""))
return lines, tbls
def get_picture(self, document, paragraph):
"""
Extract embedded images from paragraph.
Handles:
- Inline images (blip elements)
- Multiple images (concat together)
- Image format errors (graceful skip)
"""
imgs = paragraph._element.xpath('.//pic:pic')
if not imgs:
return None
res_img = None
for img in imgs:
embed = img.xpath('.//a:blip/@r:embed')
if not embed:
continue
try:
related_part = document.part.related_parts[embed[0]]
image_blob = related_part.image.blob
image = Image.open(BytesIO(image_blob)).convert('RGB')
if res_img is None:
res_img = image
else:
res_img = concat_img(res_img, image)
except Exception:
continue
return res_img
```
## Excel Handler
```python
# /deepdoc/parser/excel_parser.py
class RAGFlowExcelParser:
"""
Excel/CSV parser.
Supports:
- .xlsx, .xls (openpyxl, pandas)
- .csv (pandas)
- Multiple sheets
- HTML and Markdown output
"""
def __call__(self, fnm):
"""
Parse Excel to natural language descriptions.
Output format:
"Header1: Value1; Header2: Value2 ——SheetName"
"""
wb = self._load_excel_to_workbook(fnm)
res = []
for sheetname in wb.sheetnames:
ws = wb[sheetname]
rows = list(ws.rows)
if not rows:
continue
# First row as headers
ti = list(rows[0])
# Process data rows
for r in rows[1:]:
fields = []
for i, c in enumerate(r):
if not c.value:
continue
t = str(ti[i].value) if i < len(ti) else ""
t += ("" if t else "") + str(c.value)
fields.append(t)
line = "; ".join(fields)
if sheetname.lower().find("sheet") < 0:
line += " ——" + sheetname
res.append(line)
return res
def html(self, fnm, chunk_rows=256):
"""
Convert to HTML tables with chunking.
Splits large tables into chunks of chunk_rows rows.
"""
wb = self._load_excel_to_workbook(fnm)
tb_chunks = []
for sheetname in wb.sheetnames:
ws = wb[sheetname]
rows = list(ws.rows)
# Build header row
tb_rows_0 = "<tr>"
for t in list(rows[0]):
tb_rows_0 += f"<th>{escape(str(t.value or ''))}</th>"
tb_rows_0 += "</tr>"
# Chunk data rows
for chunk_i in range((len(rows) - 1) // chunk_rows + 1):
tb = f"<table><caption>{sheetname}</caption>"
tb += tb_rows_0
start = 1 + chunk_i * chunk_rows
end = min(start + chunk_rows, len(rows))
for r in rows[start:end]:
tb += "<tr>"
for c in r:
tb += f"<td>{escape(str(c.value or ''))}</td>"
tb += "</tr>"
tb += "</table>\n"
tb_chunks.append(tb)
return tb_chunks
```
## PowerPoint Handler
```python
# /deepdoc/parser/ppt_parser.py
class RAGFlowPptParser:
"""
PowerPoint (.pptx) parser.
Features:
- Slide-by-slide extraction
- Shape hierarchy (text frames, tables, groups)
- Bulleted list formatting
- Embedded table extraction
"""
def __call__(self, fnm, from_page, to_page, callback=None):
ppt = Presentation(fnm) if isinstance(fnm, str) \
else Presentation(BytesIO(fnm))
txts = []
self.total_page = len(ppt.slides)
for i, slide in enumerate(ppt.slides):
if i < from_page:
continue
if i >= to_page:
break
texts = []
# Sort shapes by position (top-to-bottom, left-to-right)
for shape in sorted(slide.shapes,
key=lambda x: (
(x.top or 0) // 10,
x.left or 0
)):
txt = self._extract(shape)
if txt:
texts.append(txt)
txts.append("\n".join(texts))
return txts
def _extract(self, shape):
"""
Extract text from shape recursively.
Handles:
- Text frames with paragraphs
- Tables (shape_type == 19)
- Group shapes (shape_type == 6)
"""
# Text frame
if hasattr(shape, 'has_text_frame') and shape.has_text_frame:
texts = []
for paragraph in shape.text_frame.paragraphs:
if paragraph.text.strip():
texts.append(self._get_bulleted_text(paragraph))
return "\n".join(texts)
shape_type = shape.shape_type
# Table
if shape_type == 19:
tb = shape.table
rows = []
for i in range(1, len(tb.rows)):
rows.append("; ".join([
f"{tb.cell(0, j).text}: {tb.cell(i, j).text}"
for j in range(len(tb.columns))
if tb.cell(i, j)
]))
return "\n".join(rows)
# Group shape
if shape_type == 6:
texts = []
for p in sorted(shape.shapes,
key=lambda x: (x.top // 10, x.left)):
t = self._extract(p)
if t:
texts.append(t)
return "\n".join(texts)
return ""
```
## HTML Handler
```python
# /deepdoc/parser/html_parser.py
class RAGFlowHtmlParser:
"""
HTML parser using BeautifulSoup.
Features:
- Block tag detection (p, div, h1-h6, table, etc.)
- Script/style removal
- Table extraction
- Heading hierarchy to markdown
"""
BLOCK_TAGS = [
"h1", "h2", "h3", "h4", "h5", "h6",
"p", "div", "article", "section", "aside",
"ul", "ol", "li",
"table", "pre", "code", "blockquote",
"figure", "figcaption"
]
TITLE_TAGS = {
"h1": "#", "h2": "##", "h3": "###",
"h4": "####", "h5": "#####", "h6": "######"
}
def __call__(self, fnm, binary=None, chunk_token_num=512):
if binary:
encoding = find_codec(binary)
txt = binary.decode(encoding, errors="ignore")
else:
with open(fnm, "r", encoding=get_encoding(fnm)) as f:
txt = f.read()
return self.parser_txt(txt, chunk_token_num)
@classmethod
def parser_txt(cls, txt, chunk_token_num):
"""
Parse HTML text to chunks.
Process:
1. Clean HTML (remove scripts, styles, comments)
2. Recursively extract text from body
3. Merge blocks by block_id
4. Chunk by token limit
"""
soup = BeautifulSoup(txt, "html5lib")
# Remove unwanted elements
for tag in soup.find_all(["style", "script"]):
tag.decompose()
# Extract text recursively
temp_sections = []
cls.read_text_recursively(
soup.body, temp_sections,
chunk_token_num=chunk_token_num
)
# Merge and chunk
block_txt_list, table_list = cls.merge_block_text(temp_sections)
sections = cls.chunk_block(block_txt_list, chunk_token_num)
# Add tables
for table in table_list:
sections.append(table.get("content", ""))
return sections
```
## Text Handler
```python
# /deepdoc/parser/txt_parser.py
class RAGFlowTxtParser:
"""
Plain text parser with delimiter-based chunking.
Supports:
- .txt, .py, .js, .java, .c, .cpp, .h, .php,
.go, .ts, .sh, .cs, .kt, .sql
"""
def __call__(self, fnm, binary=None, chunk_token_num=128,
delimiter="\n!?;。;!?"):
txt = get_text(fnm, binary)
return self.parser_txt(txt, chunk_token_num, delimiter)
@classmethod
def parser_txt(cls, txt, chunk_token_num=128,
delimiter="\n!?;。;!?"):
"""
Split text by delimiters and chunk by token count.
"""
cks = [""]
tk_nums = [0]
# Parse delimiter (support regex patterns)
dels = cls._parse_delimiter(delimiter)
secs = re.split(r"(%s)" % dels, txt)
for sec in secs:
if re.match(f"^{dels}$", sec):
continue
cls._add_chunk(sec, cks, tk_nums, chunk_token_num)
return [[c, ""] for c in cks]
```
## Markdown Handler
```python
# /deepdoc/parser/markdown_parser.py
class RAGFlowMarkdownParser:
"""
Markdown parser with element extraction.
Features:
- Heading hierarchy detection
- Table extraction (separate or inline)
- Image URL extraction and loading
- Code block handling
"""
def __call__(self, filename, binary=None, separate_tables=True,
delimiter=None, return_section_images=False):
if binary:
encoding = find_codec(binary)
txt = binary.decode(encoding, errors="ignore")
else:
with open(filename, "r") as f:
txt = f.read()
# Extract tables
remainder, tables = self.extract_tables_and_remainder(
f'{txt}\n',
separate_tables=separate_tables
)
# Extract elements with metadata
extractor = MarkdownElementExtractor(txt)
image_refs = self.extract_image_urls_with_lines(txt)
element_sections = extractor.extract_elements(
delimiter,
include_meta=True
)
# Process sections with images
sections = []
section_images = []
image_cache = {}
for element in element_sections:
content = element["content"]
start_line = element["start_line"]
end_line = element["end_line"]
# Find images in section
urls_in_section = [
ref["url"] for ref in image_refs
if start_line <= ref["line"] <= end_line
]
imgs = []
if urls_in_section:
imgs, image_cache = self.load_images_from_urls(
urls_in_section, image_cache
)
combined_image = None
if imgs:
combined_image = reduce(concat_img, imgs) \
if len(imgs) > 1 else imgs[0]
sections.append((content, ""))
section_images.append(combined_image)
# Convert tables to HTML
tbls = []
for table in tables:
html = markdown(table, extensions=['markdown.extensions.tables'])
tbls.append(((None, html), ""))
if return_section_images:
return sections, tbls, section_images
return sections, tbls
```
## JSON Handler
```python
# /deepdoc/parser/json_parser.py
class RAGFlowJsonParser:
"""
JSON/JSONL parser.
Supports:
- .json (single object or array)
- .jsonl, .ldjson (line-delimited JSON)
- Nested object flattening
"""
def __call__(self, binary, chunk_token_num=512):
txt = binary.decode('utf-8', errors='ignore')
# Try parsing as JSONL first
lines = txt.strip().split('\n')
results = []
for line in lines:
try:
obj = json.loads(line)
flat = self._flatten(obj)
results.append(self._to_text(flat))
except json.JSONDecodeError:
# Try as full JSON
try:
data = json.loads(txt)
if isinstance(data, list):
for item in data:
flat = self._flatten(item)
results.append(self._to_text(flat))
else:
flat = self._flatten(data)
results.append(self._to_text(flat))
except:
pass
break
return self._chunk(results, chunk_token_num)
```
## File Extension Routing
```python
# In /rag/app/naive.py chunk() function
FILE_HANDLERS = {
# PDF
r"\.pdf$": _handle_pdf,
# Microsoft Office
r"\.docx$": _handle_docx,
r"\.doc$": _handle_doc, # Requires tika
r"\.pptx?$": _handle_ppt,
r"\.(csv|xlsx?)$": _handle_excel,
# Text
r"\.(txt|py|js|java|c|cpp|h|php|go|ts|sh|cs|kt|sql)$": _handle_txt,
r"\.(md|markdown)$": _handle_markdown,
# Web
r"\.(htm|html)$": _handle_html,
# Data
r"\.(json|jsonl|ldjson)$": _handle_json,
}
def chunk(filename, binary=None, ...):
for pattern, handler in FILE_HANDLERS.items():
if re.search(pattern, filename, re.IGNORECASE):
return handler(filename, binary, ...)
raise NotImplementedError(
"file type not supported yet"
)
```
## Configuration
```python
# Parser configuration options
parser_config = {
# Chunking
"chunk_token_num": 512, # Max tokens per chunk
"delimiter": "\n!?。;!?", # Chunk boundaries
"overlapped_percent": 0, # Chunk overlap
# PDF specific
"layout_recognize": "DeepDOC", # DeepDOC, MinerU, Plain Text, etc.
"analyze_hyperlink": True, # Extract URLs from documents
# Excel specific
"html4excel": False, # Output as HTML tables
}
```
## Related Files
- `/deepdoc/parser/__init__.py` - Parser exports
- `/deepdoc/parser/pdf_parser.py` - PDF parser
- `/deepdoc/parser/docx_parser.py` - Word parser
- `/deepdoc/parser/excel_parser.py` - Excel/CSV parser
- `/deepdoc/parser/ppt_parser.py` - PowerPoint parser
- `/deepdoc/parser/html_parser.py` - HTML parser
- `/deepdoc/parser/txt_parser.py` - Text parser
- `/deepdoc/parser/markdown_parser.py` - Markdown parser
- `/deepdoc/parser/json_parser.py` - JSON parser
- `/rag/app/naive.py` - Main chunk() function

View file

@ -0,0 +1,547 @@
# Layout Detection - Detectron2 Layout Recognition
## Tong Quan
Layout detection la buoc quan trong trong document processing pipeline, giup phan loai cac vung noi dung trong document (text, title, table, figure, etc.). RAGFlow su dung Detectron2-based models va ho tro nhieu backend khac nhau (ONNX, YOLOv10, Ascend NPU).
## File Location
```
/deepdoc/vision/layout_recognizer.py
```
## Architecture
```
LAYOUT DETECTION PIPELINE
Page Image
┌─────────────────────────────────────────────────────────────────┐
│ LAYOUT RECOGNIZER │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Model Options: │ │
│ │ - ONNX (default): layout.onnx │ │
│ │ - YOLOv10: layout_yolov10.onnx │ │
│ │ - Ascend NPU: layout.om │ │
│ │ - TensorRT DLA: External service │ │
│ └─────────────────────────────────────────────────────────┘ │
└──────────────────────────┬──────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ DETECTED LAYOUTS │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Layout Types: │ │
│ │ • Text • Table • Header │ │
│ │ • Title • Table caption • Footer │ │
│ │ • Figure • Figure caption • Reference │ │
│ │ • Equation │ │
│ └─────────────────────────────────────────────────────────┘ │
└──────────────────────────┬──────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ TAG OCR BOXES │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ For each OCR box: │ │
│ │ 1. Find overlapping layout region │ │
│ │ 2. Assign layout_type and layoutno │ │
│ │ 3. Filter garbage (headers, footers, page numbers) │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
## Layout Types
| Type | Description | Xu Ly |
|------|-------------|-------|
| Text | Regular body text | Keep as content |
| Title | Section/document titles | Mark as heading |
| Figure | Images, diagrams, charts | Extract image + caption |
| Figure caption | Descriptions below figures | Associate with figure |
| Table | Data tables | Extract structure (TSR) |
| Table caption | Descriptions for tables | Associate with table |
| Header | Page headers | Filter (garbage) |
| Footer | Page footers | Filter (garbage) |
| Reference | Bibliography section | Filter (optional) |
| Equation | Mathematical formulas | Keep as figure |
## Core Implementation
### LayoutRecognizer Class
```python
class LayoutRecognizer(Recognizer):
"""
Base layout recognizer using ONNX model.
Inherits from Recognizer base class for model loading
and inference.
"""
labels = [
"_background_",
"Text",
"Title",
"Figure",
"Figure caption",
"Table",
"Table caption",
"Header",
"Footer",
"Reference",
"Equation",
]
def __init__(self, domain):
"""
Initialize with model from HuggingFace or local.
Args:
domain: Model domain name (e.g., "layout")
"""
model_dir = os.path.join(
get_project_base_directory(),
"rag/res/deepdoc"
)
super().__init__(self.labels, domain, model_dir)
# Layouts to filter out
self.garbage_layouts = ["footer", "header", "reference"]
# Optional TensorRT DLA client
if os.environ.get("TENSORRT_DLA_SVR"):
self.client = DLAClient(os.environ["TENSORRT_DLA_SVR"])
def __call__(self, image_list, ocr_res, scale_factor=3, thr=0.2,
batch_size=16, drop=True):
"""
Detect layouts and tag OCR boxes.
Args:
image_list: List of page images
ocr_res: OCR results per page
scale_factor: Image zoom factor (default 3)
thr: Confidence threshold
batch_size: Inference batch size
drop: Whether to drop garbage layouts
Returns:
- ocr_res: OCR boxes with layout tags
- page_layout: Layout regions per page
"""
```
### Layout Detection Process
```python
def __call__(self, image_list, ocr_res, scale_factor=3, thr=0.2,
batch_size=16, drop=True):
"""
Main layout detection and OCR tagging pipeline.
"""
# 1. Run layout detection
if self.client:
# Use TensorRT DLA service
layouts = self.client.predict(image_list)
else:
# Use local ONNX model
layouts = super().__call__(image_list, thr, batch_size)
boxes = []
garbages = {}
page_layout = []
# 2. Process each page
for pn, lts in enumerate(layouts):
bxs = ocr_res[pn]
# Convert layout format
lts = [{
"type": b["type"],
"score": float(b["score"]),
"x0": b["bbox"][0] / scale_factor,
"x1": b["bbox"][2] / scale_factor,
"top": b["bbox"][1] / scale_factor,
"bottom": b["bbox"][-1] / scale_factor,
"page_number": pn,
} for b in lts if float(b["score"]) >= 0.4 or
b["type"] not in self.garbage_layouts]
# Sort layouts by Y position
lts = self.sort_Y_firstly(lts, np.mean([
lt["bottom"] - lt["top"] for lt in lts
]) / 2)
# Cleanup overlapping layouts
lts = self.layouts_cleanup(bxs, lts)
page_layout.append(lts)
# 3. Tag OCR boxes with layout types
for lt_type in ["footer", "header", "reference",
"figure caption", "table caption",
"title", "table", "text", "figure", "equation"]:
self._findLayout(lt_type, bxs, lts, pn, image_list,
scale_factor, garbages, drop)
# 4. Add unvisited figures
for i, lt in enumerate([lt for lt in lts
if lt["type"] in ["figure", "equation"]]):
if lt.get("visited"):
continue
lt = deepcopy(lt)
del lt["type"]
lt["text"] = ""
lt["layout_type"] = "figure"
lt["layoutno"] = f"figure-{i}"
bxs.append(lt)
boxes.extend(bxs)
# 5. Remove duplicate garbage text
garbag_set = set()
for k in garbages.keys():
garbages[k] = Counter(garbages[k])
for g, c in garbages[k].items():
if c > 1: # Appears on multiple pages
garbag_set.add(g)
ocr_res = [b for b in boxes if b["text"].strip() not in garbag_set]
return ocr_res, page_layout
```
### Layout-OCR Box Matching
```python
def _findLayout(self, ty, bxs, lts, pn, image_list, scale_factor,
garbages, drop):
"""
Find matching layout for each OCR box.
Process:
1. Get all layouts of specified type
2. For each untagged OCR box:
- Check if it's garbage (page numbers, etc.)
- Find overlapping layout region
- Tag with layout type
- Filter garbage layouts if drop=True
"""
lts_of_type = [lt for lt in lts if lt["type"] == ty]
i = 0
while i < len(bxs):
# Skip already tagged boxes
if bxs[i].get("layout_type"):
i += 1
continue
# Check for garbage patterns
if self._is_garbage(bxs[i]):
bxs.pop(i)
continue
# Find overlapping layout
ii = self.find_overlapped_with_threshold(bxs[i], lts_of_type, thr=0.4)
if ii is None:
# No matching layout
bxs[i]["layout_type"] = ""
i += 1
continue
lts_of_type[ii]["visited"] = True
# Check if should keep garbage layout
keep_feats = [
lts_of_type[ii]["type"] == "footer" and
bxs[i]["bottom"] < image_list[pn].size[1] * 0.9 / scale_factor,
lts_of_type[ii]["type"] == "header" and
bxs[i]["top"] > image_list[pn].size[1] * 0.1 / scale_factor,
]
if drop and lts_of_type[ii]["type"] in self.garbage_layouts \
and not any(keep_feats):
# Collect garbage for deduplication
garbages.setdefault(lts_of_type[ii]["type"], []).append(
bxs[i]["text"]
)
bxs.pop(i)
continue
# Tag box with layout info
bxs[i]["layoutno"] = f"{ty}-{ii}"
bxs[i]["layout_type"] = lts_of_type[ii]["type"] \
if lts_of_type[ii]["type"] != "equation" else "figure"
i += 1
```
### Garbage Pattern Detection
```python
def _is_garbage(self, b):
"""
Detect garbage text patterns.
Patterns:
- Bullet points only: "•••"
- Page numbers: "1 / 10", "3 of 15"
- URLs: "http://..."
- Font encoding issues: "(cid:123)"
"""
patt = [
r"^•+$", # Bullet points
"^[0-9]{1,2} / ?[0-9]{1,2}$", # Page X / Y
r"^[0-9]{1,2} of [0-9]{1,2}$", # Page X of Y
"^http://[^ ]{12,}", # URLs
r"\(cid *: *[0-9]+ *\)", # Font encoding
]
return any([re.search(p, b["text"]) for p in patt])
```
## YOLOv10 Variant
```python
class LayoutRecognizer4YOLOv10(LayoutRecognizer):
"""
YOLOv10-based layout recognizer.
Differences from base:
- Different label set
- Custom preprocessing (LetterBox resize)
- YOLO-specific postprocessing
"""
labels = [
"title", "Text", "Reference", "Figure",
"Figure caption", "Table", "Table caption",
"Table caption", "Equation", "Figure caption",
]
def preprocess(self, image_list):
"""
YOLOv10 preprocessing with letterbox resize.
"""
inputs = []
new_shape = self.input_shape
for img in image_list:
shape = img.shape[:2] # H, W
# Scale ratio
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
# Compute padding
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1]
dw /= 2
dh /= 2
# Resize
img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR)
# Pad
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
img = cv2.copyMakeBorder(
img, top, bottom, left, right,
cv2.BORDER_CONSTANT, value=(114, 114, 114)
)
# Normalize
img = img / 255.0
img = img.transpose(2, 0, 1)[np.newaxis, :].astype(np.float32)
inputs.append({
self.input_names[0]: img,
"scale_factor": [shape[1] / new_unpad[0],
shape[0] / new_unpad[1], dw, dh]
})
return inputs
def postprocess(self, boxes, inputs, thr):
"""
YOLO-specific postprocessing with NMS.
"""
thr = 0.08
boxes = np.squeeze(boxes)
# Filter by score
scores = boxes[:, 4]
boxes = boxes[scores > thr, :]
scores = scores[scores > thr]
if len(boxes) == 0:
return []
class_ids = boxes[:, -1].astype(int)
boxes = boxes[:, :4]
# Remove padding offset
boxes[:, 0] -= inputs["scale_factor"][2]
boxes[:, 2] -= inputs["scale_factor"][2]
boxes[:, 1] -= inputs["scale_factor"][3]
boxes[:, 3] -= inputs["scale_factor"][3]
# Scale to original image
input_shape = np.array([
inputs["scale_factor"][0], inputs["scale_factor"][1],
inputs["scale_factor"][0], inputs["scale_factor"][1]
])
boxes = np.multiply(boxes, input_shape, dtype=np.float32)
# NMS per class
indices = []
for class_id in np.unique(class_ids):
class_mask = class_ids == class_id
class_boxes = boxes[class_mask]
class_scores = scores[class_mask]
class_keep = nms(class_boxes, class_scores, 0.45)
indices.extend(np.where(class_mask)[0][class_keep])
return [{
"type": self.label_list[class_ids[i]].lower(),
"bbox": boxes[i].tolist(),
"score": float(scores[i])
} for i in indices]
```
## Ascend NPU Support
```python
class AscendLayoutRecognizer(Recognizer):
"""
Layout recognizer for Huawei Ascend NPU.
Uses .om (Offline Model) format and ais_bench
for inference.
"""
def __init__(self, domain):
from ais_bench.infer.interface import InferSession
model_dir = os.path.join(
get_project_base_directory(),
"rag/res/deepdoc"
)
model_file_path = os.path.join(model_dir, domain + ".om")
device_id = int(os.getenv("ASCEND_LAYOUT_RECOGNIZER_DEVICE_ID", 0))
self.session = InferSession(
device_id=device_id,
model_path=model_file_path
)
```
## Layout Cleanup
```python
def layouts_cleanup(self, bxs, lts):
"""
Clean up overlapping layout regions.
Process:
1. Remove layouts that don't overlap with any OCR boxes
2. Merge overlapping layouts of same type
3. Adjust boundaries based on OCR boxes
"""
# Implementation in base Recognizer class
pass
def find_overlapped_with_threshold(self, box, layouts, thr=0.4):
"""
Find layout region that overlaps with box.
Args:
box: OCR box with x0, x1, top, bottom
layouts: List of layout regions
thr: Minimum overlap ratio (IoU)
Returns:
Index of best matching layout or None
"""
best_idx = None
best_overlap = 0
for idx, lt in enumerate(layouts):
# Calculate intersection
x_overlap = max(0, min(box["x1"], lt["x1"]) - max(box["x0"], lt["x0"]))
y_overlap = max(0, min(box["bottom"], lt["bottom"]) -
max(box["top"], lt["top"]))
intersection = x_overlap * y_overlap
# Calculate union
box_area = (box["x1"] - box["x0"]) * (box["bottom"] - box["top"])
lt_area = (lt["x1"] - lt["x0"]) * (lt["bottom"] - lt["top"])
union = box_area + lt_area - intersection
# IoU
iou = intersection / union if union > 0 else 0
if iou > thr and iou > best_overlap:
best_overlap = iou
best_idx = idx
return best_idx
```
## Configuration
```python
# Model selection
LAYOUT_RECOGNIZER_TYPE = "onnx" # onnx, yolov10, ascend
# Detection parameters
LAYOUT_DETECTION_PARAMS = {
"threshold": 0.2, # Confidence threshold
"batch_size": 16, # Inference batch size
"scale_factor": 3, # Image zoom factor
"drop_garbage": True, # Filter headers/footers
}
# TensorRT DLA (optional)
TENSORRT_DLA_SVR = None # "http://localhost:8080"
# Ascend NPU (optional)
ASCEND_LAYOUT_RECOGNIZER_DEVICE_ID = 0
```
## Integration with PDF Parser
```python
# In pdf_parser.py
def _layouts_rec(self, zoomin):
"""
Run layout recognition on all pages.
Process:
1. Initialize LayoutRecognizer
2. Run detection on page images
3. Tag OCR boxes with layout types
4. Store layout information for later processing
"""
# Initialize recognizer
self.layout_recognizer = LayoutRecognizer("layout")
# Convert PIL images to numpy
images = [np.array(img) for img in self.page_images]
# Run layout detection and tagging
self.boxes, self.page_layout = self.layout_recognizer(
images,
[self.boxes], # OCR results
scale_factor=zoomin,
thr=0.2,
batch_size=16,
drop=True
)
```
## Related Files
- `/deepdoc/vision/layout_recognizer.py` - Layout detection
- `/deepdoc/vision/recognizer.py` - Base recognizer class
- `/deepdoc/vision/operators.py` - NMS and preprocessing
- `/rag/res/deepdoc/layout.onnx` - ONNX model

View file

@ -0,0 +1,480 @@
# OCR Pipeline - PaddleOCR Integration
## Tong Quan
OCR (Optical Character Recognition) pipeline trong RAGFlow su dung PaddleOCR de extract text tu images. He thong duoc toi uu hoa de ho tro ca CPU va GPU, voi kha nang xu ly batch va multi-GPU parallel processing.
## File Location
```
/deepdoc/vision/ocr.py
```
## Architecture
```
OCR PIPELINE ARCHITECTURE
Input Image
┌─────────────────────────────────────────────────────────────────┐
│ TEXT DETECTOR │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Model: det.onnx (DBNet) │ │
│ │ - Resize image (max 960px) │ │
│ │ - Normalize: mean=[0.485,0.456,0.406] │ │
│ │ - Detect text regions → Bounding boxes │ │
│ └─────────────────────────────────────────────────────────┘ │
└──────────────────────────┬──────────────────────────────────────┘
┌────────────────────────┐
│ Crop Text Regions │
│ Sort: top→bottom │
│ left→right │
└────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ TEXT RECOGNIZER │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Model: rec.onnx (CRNN + CTC) │ │
│ │ - Resize to 48x320 │ │
│ │ - Batch processing (16 images/batch) │ │
│ │ - CTC decode với character dictionary │ │
│ └─────────────────────────────────────────────────────────┘ │
└──────────────────────────┬──────────────────────────────────────┘
┌────────────────────────┐
│ Filter by confidence │
│ (threshold: 0.5) │
└────────────────────────┘
Text + Bounding Boxes
```
## Core Components
### 1. OCR Class (Main Entry Point)
```python
class OCR:
def __init__(self, model_dir=None):
"""
Initialize OCR with optional model directory.
Features:
- Auto-download models from HuggingFace if not found
- Multi-GPU support via PARALLEL_DEVICES setting
- Model caching for performance
"""
if settings.PARALLEL_DEVICES > 0:
# Create detector/recognizer for each GPU
self.text_detector = []
self.text_recognizer = []
for device_id in range(settings.PARALLEL_DEVICES):
self.text_detector.append(TextDetector(model_dir, device_id))
self.text_recognizer.append(TextRecognizer(model_dir, device_id))
else:
# Single device (CPU or GPU 0)
self.text_detector = [TextDetector(model_dir)]
self.text_recognizer = [TextRecognizer(model_dir)]
self.drop_score = 0.5 # Confidence threshold
def __call__(self, img, device_id=0):
"""
Full OCR pipeline: detect + recognize.
Returns:
List of (bounding_box, (text, confidence))
"""
# 1. Detect text regions
dt_boxes, det_time = self.text_detector[device_id](img)
# 2. Sort boxes (top-to-bottom, left-to-right)
dt_boxes = self.sorted_boxes(dt_boxes)
# 3. Crop and recognize each region
img_crop_list = []
for box in dt_boxes:
img_crop = self.get_rotate_crop_image(img, box)
img_crop_list.append(img_crop)
# 4. Batch recognize
rec_res, rec_time = self.text_recognizer[device_id](img_crop_list)
# 5. Filter by confidence
results = []
for box, (text, score) in zip(dt_boxes, rec_res):
if score >= self.drop_score:
results.append((box.tolist(), (text, score)))
return results
```
### 2. TextDetector Class
```python
class TextDetector:
"""
Detect text regions using DBNet model.
Input: Image (numpy array)
Output: List of 4-point polygons (bounding boxes)
"""
def __init__(self, model_dir, device_id=None):
# Preprocessing pipeline
self.preprocess_op = [
DetResizeForTest(limit_side_len=960, limit_type="max"),
NormalizeImage(
std=[0.229, 0.224, 0.225],
mean=[0.485, 0.456, 0.406],
scale='1./255.'
),
ToCHWImage(),
]
# Postprocessing: DBNet decode
self.postprocess_op = DBPostProcess(
thresh=0.3,
box_thresh=0.5,
max_candidates=1000,
unclip_ratio=1.5
)
# Load ONNX model
self.predictor, self.run_options = load_model(model_dir, 'det', device_id)
def __call__(self, img):
"""
Detect text regions in image.
Process:
1. Preprocess (resize, normalize)
2. Run inference
3. Postprocess (decode probability map to polygons)
4. Filter small boxes
"""
ori_im = img.copy()
# Preprocess
data = transform({'image': img}, self.preprocess_op)
img_tensor, shape_list = data
# Inference
outputs = self.predictor.run(None, {self.input_tensor.name: img_tensor})
# Postprocess
post_result = self.postprocess_op({"maps": outputs[0]}, shape_list)
dt_boxes = post_result[0]['points']
# Filter small boxes (width or height <= 3)
dt_boxes = self.filter_tag_det_res(dt_boxes, ori_im.shape)
return dt_boxes
```
### 3. TextRecognizer Class
```python
class TextRecognizer:
"""
Recognize text from cropped images using CRNN model.
Input: List of cropped text region images
Output: List of (text, confidence) tuples
"""
def __init__(self, model_dir, device_id=None):
self.rec_image_shape = [3, 48, 320] # C, H, W
self.rec_batch_num = 16
# CTC decoder with character dictionary
self.postprocess_op = CTCLabelDecode(
character_dict_path=os.path.join(model_dir, "ocr.res"),
use_space_char=True
)
# Load ONNX model
self.predictor, self.run_options = load_model(model_dir, 'rec', device_id)
def __call__(self, img_list):
"""
Recognize text from list of images.
Process:
1. Sort by width for efficient batching
2. Resize and normalize each image
3. Batch inference
4. CTC decode
"""
img_num = len(img_list)
# Sort by aspect ratio (width/height)
width_list = [img.shape[1] / float(img.shape[0]) for img in img_list]
indices = np.argsort(np.array(width_list))
rec_res = [['', 0.0]] * img_num
# Process in batches
for beg_idx in range(0, img_num, self.rec_batch_num):
end_idx = min(img_num, beg_idx + self.rec_batch_num)
# Prepare batch
norm_img_batch = []
max_wh_ratio = self.rec_image_shape[2] / self.rec_image_shape[1]
for idx in range(beg_idx, end_idx):
h, w = img_list[indices[idx]].shape[0:2]
max_wh_ratio = max(max_wh_ratio, w / h)
for idx in range(beg_idx, end_idx):
norm_img = self.resize_norm_img(
img_list[indices[idx]],
max_wh_ratio
)
norm_img_batch.append(norm_img[np.newaxis, :])
norm_img_batch = np.concatenate(norm_img_batch)
# Inference
outputs = self.predictor.run(None, {
self.input_tensor.name: norm_img_batch
})
# CTC decode
preds = outputs[0]
rec_result = self.postprocess_op(preds)
# Store results in original order
for i, result in enumerate(rec_result):
rec_res[indices[beg_idx + i]] = result
return rec_res
```
## Model Loading
```python
def load_model(model_dir, nm, device_id=None):
"""
Load ONNX model with GPU/CPU support.
Features:
- Model caching (avoid reloading)
- Auto GPU detection
- Configurable GPU memory limit
"""
model_file_path = os.path.join(model_dir, nm + ".onnx")
# Check cache
global loaded_models
cache_key = model_file_path + str(device_id)
if cache_key in loaded_models:
return loaded_models[cache_key]
# Configure session
options = ort.SessionOptions()
options.enable_cpu_mem_arena = False
options.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL
options.intra_op_num_threads = 2
options.inter_op_num_threads = 2
# GPU configuration
if cuda_is_available():
gpu_mem_limit_mb = int(os.environ.get("OCR_GPU_MEM_LIMIT_MB", "2048"))
cuda_provider_options = {
"device_id": device_id or 0,
"gpu_mem_limit": gpu_mem_limit_mb * 1024 * 1024,
"arena_extend_strategy": "kNextPowerOfTwo"
}
sess = ort.InferenceSession(
model_file_path,
options=options,
providers=['CUDAExecutionProvider'],
provider_options=[cuda_provider_options]
)
else:
sess = ort.InferenceSession(
model_file_path,
options=options,
providers=['CPUExecutionProvider']
)
# Cache and return
run_options = ort.RunOptions()
loaded_models[cache_key] = (sess, run_options)
return loaded_models[cache_key]
```
## Image Processing Utilities
### Rotate Crop Image
```python
def get_rotate_crop_image(self, img, points):
"""
Crop text region with perspective transform.
Handles rotated/skewed text by:
1. Calculate crop dimensions
2. Apply perspective transform
3. Auto-rotate if height > width
"""
assert len(points) == 4, "shape of points must be 4*2"
# Calculate target dimensions
img_crop_width = int(max(
np.linalg.norm(points[0] - points[1]),
np.linalg.norm(points[2] - points[3])
))
img_crop_height = int(max(
np.linalg.norm(points[0] - points[3]),
np.linalg.norm(points[1] - points[2])
))
# Standard rectangle coordinates
pts_std = np.float32([
[0, 0],
[img_crop_width, 0],
[img_crop_width, img_crop_height],
[0, img_crop_height]
])
# Perspective transform
M = cv2.getPerspectiveTransform(points, pts_std)
dst_img = cv2.warpPerspective(
img, M, (img_crop_width, img_crop_height),
borderMode=cv2.BORDER_REPLICATE,
flags=cv2.INTER_CUBIC
)
# Auto-rotate if needed (height/width >= 1.5)
if dst_img.shape[0] / dst_img.shape[1] >= 1.5:
# Try different rotations, pick best recognition score
best_img = self._find_best_rotation(dst_img)
return best_img
return dst_img
```
### Box Sorting
```python
def sorted_boxes(self, dt_boxes):
"""
Sort text boxes: top-to-bottom, left-to-right.
Algorithm:
1. Initial sort by (y, x) coordinates
2. Fine-tune: swap adjacent boxes if on same line
and right box is to the left
"""
num_boxes = dt_boxes.shape[0]
# Sort by top-left corner (y first, then x)
sorted_boxes = sorted(dt_boxes, key=lambda x: (x[0][1], x[0][0]))
_boxes = list(sorted_boxes)
# Fine-tune for same-line boxes
for i in range(num_boxes - 1):
for j in range(i, -1, -1):
# If boxes on same line (y diff < 10) and wrong order
if abs(_boxes[j + 1][0][1] - _boxes[j][0][1]) < 10 and \
_boxes[j + 1][0][0] < _boxes[j][0][0]:
# Swap
_boxes[j], _boxes[j + 1] = _boxes[j + 1], _boxes[j]
else:
break
return _boxes
```
## Configuration
```python
# Environment variables
OCR_GPU_MEM_LIMIT_MB = 2048 # GPU memory limit per model
OCR_ARENA_EXTEND_STRATEGY = "kNextPowerOfTwo" # Memory allocation strategy
PARALLEL_DEVICES = 0 # Number of GPUs (0 = single device)
# Model parameters
DETECTION_PARAMS = {
"limit_side_len": 960, # Max image dimension
"thresh": 0.3, # Binary threshold
"box_thresh": 0.5, # Box confidence threshold
"max_candidates": 1000, # Max detected boxes
"unclip_ratio": 1.5 # Box expansion ratio
}
RECOGNITION_PARAMS = {
"image_shape": [3, 48, 320], # Input shape (C, H, W)
"batch_num": 16, # Batch size
"drop_score": 0.5 # Confidence threshold
}
```
## Models Used
| Model | File | Purpose | Architecture |
|-------|------|---------|--------------|
| Text Detection | det.onnx | Find text regions | DBNet (Differentiable Binarization) |
| Text Recognition | rec.onnx | Read text content | CRNN + CTC |
| Character Dict | ocr.res | Character mapping | CTC vocabulary |
## Integration with PDF Parser
```python
# In pdf_parser.py
def __ocr(self, callback, start_progress, end_progress):
"""
Run OCR on PDF page images.
For each page:
1. Call OCR to get text boxes with positions
2. Convert coordinates to page coordinate system
3. Store boxes with page number for later processing
"""
self.boxes = []
for page_idx, img in enumerate(self.page_images):
# Get OCR results
results = self.ocr(img)
if not results:
continue
# Convert to internal format
for box, (text, score) in results:
x0 = min(p[0] for p in box)
x1 = max(p[0] for p in box)
y0 = min(p[1] for p in box)
y1 = max(p[1] for p in box)
self.boxes.append({
"x0": x0 / self.ZM,
"x1": x1 / self.ZM,
"top": y0 / self.ZM + self.page_cum_height[page_idx],
"bottom": y1 / self.ZM + self.page_cum_height[page_idx],
"text": text,
"page_number": page_idx,
"score": score
})
# Update progress
if callback:
progress = start_progress + (end_progress - start_progress) * \
(page_idx / len(self.page_images))
callback(progress, f"OCR page {page_idx + 1}")
```
## Related Files
- `/deepdoc/vision/ocr.py` - Main OCR implementation
- `/deepdoc/vision/operators.py` - Image preprocessing operators
- `/deepdoc/vision/postprocess.py` - DBNet and CTC postprocessing
- `/rag/res/deepdoc/` - Model files (det.onnx, rec.onnx, ocr.res)

View file

@ -0,0 +1,466 @@
# PDF Parsing Pipeline
## Tong Quan
RAGFlow PDF parser kết hợp OCR, layout detection, và table structure recognition để extract structured content từ PDFs.
## File Location
```
/deepdoc/parser/pdf_parser.py
```
## Processing Pipeline
```
┌─────────────────────────────────────────────────────────────────┐
│ PDF PARSING PIPELINE │
└─────────────────────────────────────────────────────────────────┘
PDF Binary
┌─────────────────────────────────────────────────────────────────┐
│ 1. __images__() [0-40%] │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ pdfplumber.open(pdf_binary) │ │
│ │ for page in pdf.pages: │ │
│ │ img = page.to_image(resolution=72*ZM) │ │
│ │ images.append(img.original) # PIL Image │ │
│ └─────────────────────────────────────────────────────┘ │
└──────────────────────────┬──────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 2. __ocr() [40-63%] │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ For each page image: │ │
│ │ - PaddleOCR.detect() → text regions │ │
│ │ - PaddleOCR.recognize() → text content │ │
│ │ Output: bxs = [{x0, x1, top, bottom, text}, ...] │ │
│ └─────────────────────────────────────────────────────┘ │
└──────────────────────────┬──────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 3. _layouts_rec() [63-83%] │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Detectron2 layout detection: │ │
│ │ - Text, Title, Table, Figure, Header, Footer, etc. │ │
│ │ Tag OCR boxes with layout_type │ │
│ └─────────────────────────────────────────────────────┘ │
└──────────────────────────┬──────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 4. _table_transformer_job() [Table TSR] │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ For tables detected: │ │
│ │ - Crop table region │ │
│ │ - Run TableStructureRecognizer │ │
│ │ - Detect rows, columns, cells │ │
│ └─────────────────────────────────────────────────────┘ │
└──────────────────────────┬──────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 5. Text Merging Pipeline │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ _text_merge() → Horizontal merge │ │
│ │ _assign_column() → KMeans column detection │ │
│ │ _naive_vertical_merge() → XGBoost vertical merge │ │
│ │ _final_reading_order_merge() → Reading order │ │
│ └─────────────────────────────────────────────────────┘ │
└──────────────────────────┬──────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 6. _extract_table_figure() [83-100%] │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ - Separate tables/figures from text │ │
│ │ - Find and associate captions │ │
│ │ - Crop images for tables/figures │ │
│ │ - Convert table structure to natural language │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
## RAGFlowPdfParser Class
```python
class RAGFlowPdfParser:
ZM = 3 # Zoom factor for image extraction
def __init__(self):
self.ocr = OCR()
self.layout_recognizer = LayoutRecognizer()
self.tsr = TableStructureRecognizer()
def parse_into_bboxes(self, filename, callback=None):
"""
Main parsing method.
Returns:
List of text boxes with layout information
"""
# 1. Extract images
self.__images__(filename, callback, 0, 0.4)
# 2. OCR detection
self.__ocr(callback, 0.4, 0.63)
# 3. Layout recognition
self._layouts_rec(callback, 0.63, 0.83)
# 4. Table structure recognition
self._table_transformer_job()
# 5. Text merging
self._text_merge()
self._assign_column()
self._naive_vertical_merge()
self._final_reading_order_merge()
# 6. Extract tables/figures
return self._extract_table_figure(callback, 0.83, 1.0)
```
## Image Extraction
```python
def __images__(self, filename, callback, start_progress, end_progress):
"""
Extract page images from PDF.
"""
self.pdf = pdfplumber.open(filename)
self.page_images = []
self.page_cum_heights = [0]
total = len(self.pdf.pages)
for i, page in enumerate(self.pdf.pages):
# Convert to image with 3x zoom
img = page.to_image(resolution=72 * self.ZM)
self.page_images.append(img.original)
# Track cumulative heights for coordinate mapping
self.page_cum_heights.append(
self.page_cum_heights[-1] + page.height * self.ZM
)
# Progress callback
if callback:
progress = start_progress + (end_progress - start_progress) * (i / total)
callback(progress, f"Extracting page {i+1}/{total}")
```
## OCR Processing
```python
def __ocr(self, callback, start_progress, end_progress):
"""
Run OCR on all pages.
"""
self.bxs = [] # All text boxes
for page_idx, img in enumerate(self.page_images):
# Detect text regions
detections = self.ocr.detect(img)
if not detections:
continue
# Recognize text in regions
for det in detections:
x0, y0, x1, y1 = det["box"]
confidence = det["confidence"]
# Crop region
region_img = img.crop((x0, y0, x1, y1))
# Recognize
text = self.ocr.recognize(region_img)
if text.strip():
self.bxs.append({
"x0": x0,
"x1": x1,
"top": y0 + self.page_cum_heights[page_idx],
"bottom": y1 + self.page_cum_heights[page_idx],
"text": text,
"page_num": page_idx,
"confidence": confidence
})
# Progress
if callback:
progress = start_progress + (end_progress - start_progress) * (page_idx / len(self.page_images))
callback(progress, f"OCR page {page_idx+1}")
```
## Layout Recognition
```python
def _layouts_rec(self, callback, start_progress, end_progress):
"""
Detect layout types for text boxes.
"""
for page_idx, img in enumerate(self.page_images):
# Run layout detection
layouts = self.layout_recognizer.detect(img)
# Tag OCR boxes with layout type
for layout in layouts:
lx0, ly0, lx1, ly1 = layout["box"]
layout_type = layout["type"] # Text, Title, Table, etc.
layout_num = layout["num"]
# Find overlapping OCR boxes
for bx in self.bxs:
if bx["page_num"] != page_idx:
continue
# Check overlap
if self._overlaps(bx, (lx0, ly0, lx1, ly1)):
bx["layout_type"] = layout_type
bx["layout_num"] = layout_num
# Progress
if callback:
progress = start_progress + (end_progress - start_progress) * (page_idx / len(self.page_images))
callback(progress, f"Layout detection page {page_idx+1}")
```
## Text Merging
```python
def _text_merge(self):
"""
Horizontal merge of adjacent boxes with same layout.
"""
# Sort by position
self.bxs.sort(key=lambda b: (b["page_num"], b["top"], b["x0"]))
merged = []
current = None
for bx in self.bxs:
if current is None:
current = bx
continue
# Check if should merge
if self._should_merge_horizontal(current, bx):
# Merge
current["x1"] = bx["x1"]
current["text"] += " " + bx["text"]
else:
merged.append(current)
current = bx
if current:
merged.append(current)
self.bxs = merged
def _assign_column(self):
"""
Detect columns using KMeans clustering.
"""
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
# Get X coordinates
x_coords = np.array([[b["x0"]] for b in self.bxs])
best_k = 1
best_score = -1
# Find optimal number of columns
for k in range(1, min(5, len(self.bxs))):
if k >= len(self.bxs):
break
km = KMeans(n_clusters=k, random_state=42)
labels = km.fit_predict(x_coords)
if k > 1:
score = silhouette_score(x_coords, labels)
if score > best_score:
best_score = score
best_k = k
# Assign columns
km = KMeans(n_clusters=best_k, random_state=42)
labels = km.fit_predict(x_coords)
for i, bx in enumerate(self.bxs):
bx["col_id"] = labels[i]
def _naive_vertical_merge(self):
"""
Vertical merge using XGBoost model.
"""
model = load_model("updown_concat_xgb.model")
merged = []
current = None
for bx in self.bxs:
if current is None:
current = bx
continue
# Extract features
features = self._extract_merge_features(current, bx)
# Predict
prob = model.predict_proba([features])[0][1]
if prob > 0.5:
# Merge
current["bottom"] = bx["bottom"]
current["text"] += "\n" + bx["text"]
else:
merged.append(current)
current = bx
if current:
merged.append(current)
self.bxs = merged
```
## Merge Features
```python
def _extract_merge_features(self, top_box, bottom_box):
"""
Extract features for vertical merge decision.
Returns 36+ features including:
- Y-distance normalized
- Same layout number
- Ending punctuation patterns
- Beginning character patterns
- Chinese numbering patterns
"""
features = []
# Distance features
y_dist = bottom_box["top"] - top_box["bottom"]
char_height = top_box["bottom"] - top_box["top"]
features.append(y_dist / char_height if char_height > 0 else 0)
# Layout features
features.append(1 if top_box.get("layout_num") == bottom_box.get("layout_num") else 0)
# Text pattern features
top_text = top_box["text"]
bottom_text = bottom_box["text"]
# Ending punctuation
features.append(1 if top_text.endswith((".", "。", "!", "?", "", "")) else 0)
features.append(1 if top_text.endswith((",", "", ";", "")) else 0)
# Beginning patterns
features.append(1 if bottom_text[0:1].isupper() else 0)
features.append(1 if re.match(r"^[一二三四五六七八九十]+、", bottom_text) else 0)
features.append(1 if re.match(r"^第[一二三四五六七八九十]+章", bottom_text) else 0)
# ... more features
return features
```
## Table Extraction
```python
def _extract_table_figure(self, callback, start_progress, end_progress):
"""
Extract tables and figures with captions.
"""
results = []
for bx in self.bxs:
layout_type = bx.get("layout_type", "text")
if layout_type == "table":
# Get table content from TSR
table_content = self._get_table_content(bx)
# Find caption
caption = self._find_caption(bx, "table")
results.append({
"type": "table",
"content": table_content,
"caption": caption,
"positions": [(bx["page_num"], bx["x0"], bx["x1"], bx["top"], bx["bottom"])]
})
elif layout_type == "figure":
# Crop figure image
fig_img = self._crop_region(bx)
# Find caption
caption = self._find_caption(bx, "figure")
results.append({
"type": "figure",
"image": fig_img,
"caption": caption,
"positions": [(bx["page_num"], bx["x0"], bx["x1"], bx["top"], bx["bottom"])]
})
else:
# Regular text
results.append({
"type": "text",
"content": bx["text"],
"positions": [(bx["page_num"], bx["x0"], bx["x1"], bx["top"], bx["bottom"])]
})
return results
def _get_table_content(self, table_box):
"""
Convert table structure to natural language.
Example output:
"Row 1, Column Name: Value
Row 2, Column Name: Value"
"""
# Get TSR results for this table
tsr_result = self.table_structures.get(table_box["layout_num"])
if not tsr_result:
return table_box["text"]
# Build natural language representation
lines = []
for row_idx, row in enumerate(tsr_result["rows"]):
for col_idx, cell in enumerate(row["cells"]):
col_name = tsr_result["headers"][col_idx] if col_idx < len(tsr_result["headers"]) else f"Column {col_idx+1}"
lines.append(f"Row {row_idx+1}, {col_name}: {cell['text']}")
return "\n".join(lines)
```
## Configuration
```python
# PDF parser configuration
{
"layout_recognize": "DeepDOC", # DeepDOC, Plain, Vision
"ocr_timeout": 60, # OCR timeout seconds
"max_page_size": 4096, # Max image dimension
"zoom_factor": 3, # Image zoom for OCR
}
```
## Related Files
- `/deepdoc/parser/pdf_parser.py` - Main parser
- `/deepdoc/vision/ocr.py` - OCR engine
- `/deepdoc/vision/layout_recognizer.py` - Layout detection
- `/deepdoc/vision/table_structure_recognizer.py` - TSR

View file

@ -0,0 +1,647 @@
# Table Structure Recognition (TSR)
## Tong Quan
Table Structure Recognition (TSR) la component xu ly cau truc bang trong documents. No phan tich cac vung table da duoc detect boi Layout Recognizer de xac dinh rows, columns, cells va cau truc header. Ket qua duoc su dung de chuyen bang thanh HTML hoac natural language format.
## File Location
```
/deepdoc/vision/table_structure_recognizer.py
```
## Architecture
```
TABLE STRUCTURE RECOGNITION PIPELINE
Table Image Region
┌─────────────────────────────────────────────────────────────────┐
│ TABLE TRANSFORMER │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Model: tsr.onnx (TableTransformer) │ │
│ │ Detected Elements: │ │
│ │ • table • table column header │ │
│ │ • table column • table projected row header │ │
│ │ • table row • table spanning cell │ │
│ └─────────────────────────────────────────────────────────┘ │
└──────────────────────────┬──────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ STRUCTURE ALIGNMENT │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ • Align rows: left & right edges │ │
│ │ • Align columns: top & bottom edges │ │
│ │ • Handle spanning cells │ │
│ └─────────────────────────────────────────────────────────┘ │
└──────────────────────────┬──────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ TABLE CONSTRUCTION │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ • Map OCR boxes to cells │ │
│ │ • Identify header rows │ │
│ │ • Calculate colspan/rowspan │ │
│ │ • Output: HTML table or Natural language │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
## TSR Labels
| Label | Description |
|-------|-------------|
| table | Overall table boundary |
| table column | Vertical column dividers |
| table row | Horizontal row dividers |
| table column header | Header row(s) at top |
| table projected row header | Row headers on left side |
| table spanning cell | Merged cells (colspan/rowspan) |
## Core Implementation
### TableStructureRecognizer Class
```python
class TableStructureRecognizer(Recognizer):
"""
Recognize table structure (rows, columns, cells).
Uses TableTransformer model to detect:
- Row and column boundaries
- Header regions
- Spanning (merged) cells
"""
labels = [
"table",
"table column",
"table row",
"table column header",
"table projected row header",
"table spanning cell",
]
def __init__(self):
model_dir = os.path.join(
get_project_base_directory(),
"rag/res/deepdoc"
)
super().__init__(self.labels, "tsr", model_dir)
def __call__(self, images, thr=0.2):
"""
Detect table structure in images.
Args:
images: List of cropped table images
thr: Confidence threshold
Returns:
List of table structures with aligned rows/columns
"""
# Run inference
tbls = super().__call__(images, thr)
res = []
for tbl in tbls:
# Convert to internal format
lts = [{
"label": b["type"],
"score": b["score"],
"x0": b["bbox"][0],
"x1": b["bbox"][2],
"top": b["bbox"][1],
"bottom": b["bbox"][-1],
} for b in tbl]
if not lts:
continue
# Align row boundaries (left & right)
lts = self._align_rows(lts)
# Align column boundaries (top & bottom)
lts = self._align_columns(lts)
res.append(lts)
return res
```
### Row/Column Alignment
```python
def _align_rows(self, lts):
"""
Align row boundaries to consistent left/right edges.
Process:
1. Find all row and header elements
2. Calculate mean left/right position
3. Adjust elements to align
"""
# Get row elements
row_elements = [b for b in lts
if b["label"].find("row") > 0 or
b["label"].find("header") > 0]
if not row_elements:
return lts
# Calculate alignment positions
left_positions = [b["x0"] for b in row_elements]
right_positions = [b["x1"] for b in row_elements]
left = np.mean(left_positions) if len(left_positions) > 4 \
else np.min(left_positions)
right = np.mean(right_positions) if len(right_positions) > 4 \
else np.max(right_positions)
# Align rows
for b in lts:
if b["label"].find("row") > 0 or b["label"].find("header") > 0:
if b["x0"] > left:
b["x0"] = left
if b["x1"] < right:
b["x1"] = right
return lts
def _align_columns(self, lts):
"""
Align column boundaries to consistent top/bottom edges.
"""
# Get column elements
col_elements = [b for b in lts if b["label"] == "table column"]
if not col_elements:
return lts
# Calculate alignment positions
top_positions = [b["top"] for b in col_elements]
bottom_positions = [b["bottom"] for b in col_elements]
top = np.median(top_positions) if len(top_positions) > 4 \
else np.min(top_positions)
bottom = np.median(bottom_positions) if len(bottom_positions) > 4 \
else np.max(bottom_positions)
# Align columns
for b in lts:
if b["label"] == "table column":
if b["top"] > top:
b["top"] = top
if b["bottom"] < bottom:
b["bottom"] = bottom
return lts
```
### Table Construction
```python
@staticmethod
def construct_table(boxes, is_english=False, html=True, **kwargs):
"""
Construct table from OCR boxes with structure info.
Args:
boxes: OCR boxes with row/column assignments
is_english: Language setting
html: Output HTML (True) or natural language (False)
Returns:
HTML string or list of natural language descriptions
"""
# 1. Extract and remove caption
cap = ""
i = 0
while i < len(boxes):
if TableStructureRecognizer.is_caption(boxes[i]):
cap += boxes[i]["text"]
boxes.pop(i)
else:
i += 1
if not boxes:
return []
# 2. Classify block types
for b in boxes:
b["btype"] = TableStructureRecognizer.blockType(b)
max_type = Counter([b["btype"] for b in boxes]).most_common(1)[0][0]
# 3. Sort and assign row numbers
rowh = [b["R_bott"] - b["R_top"] for b in boxes if "R" in b]
rowh = np.min(rowh) if rowh else 0
boxes = Recognizer.sort_R_firstly(boxes, rowh / 2)
boxes[0]["rn"] = 0
rows = [[boxes[0]]]
btm = boxes[0]["bottom"]
for b in boxes[1:]:
b["rn"] = len(rows) - 1
lst_r = rows[-1]
# Check if new row
if lst_r[-1].get("R", "") != b.get("R", "") or \
(b["top"] >= btm - 3 and
lst_r[-1].get("R", "-1") != b.get("R", "-2")):
btm = b["bottom"]
b["rn"] += 1
rows.append([b])
continue
btm = (btm + b["bottom"]) / 2.0
rows[-1].append(b)
# 4. Sort and assign column numbers
colwm = [b["C_right"] - b["C_left"] for b in boxes if "C" in b]
colwm = np.min(colwm) if colwm else 0
boxes = Recognizer.sort_C_firstly(boxes, colwm / 2)
boxes[0]["cn"] = 0
cols = [[boxes[0]]]
right = boxes[0]["x1"]
for b in boxes[1:]:
b["cn"] = len(cols) - 1
lst_c = cols[-1]
# Check if new column
if b["x0"] >= right and \
lst_c[-1].get("C", "-1") != b.get("C", "-2"):
right = b["x1"]
b["cn"] += 1
cols.append([b])
continue
right = (right + b["x1"]) / 2.0
cols[-1].append(b)
# 5. Build table matrix
tbl = [[[] for _ in range(len(cols))] for _ in range(len(rows))]
for b in boxes:
tbl[b["rn"]][b["cn"]].append(b)
# 6. Identify header rows
hdset = set()
for i in range(len(tbl)):
cnt, h = 0, 0
for j, arr in enumerate(tbl[i]):
if not arr:
continue
cnt += 1
if any([a.get("H") for a in arr]) or \
(max_type == "Nu" and arr[0]["btype"] != "Nu"):
h += 1
if h / cnt > 0.5:
hdset.add(i)
# 7. Calculate spans
tbl = TableStructureRecognizer._cal_spans(boxes, rows, cols, tbl, html)
# 8. Output
if html:
return TableStructureRecognizer._html_table(cap, hdset, tbl)
else:
return TableStructureRecognizer._desc_table(cap, hdset, tbl, is_english)
```
### Block Type Classification
```python
@staticmethod
def blockType(b):
"""
Classify cell content type.
Types:
- Dt: Date (2024-01-01, 2024年1月)
- Nu: Number (123, 45.6, -78%)
- Ca: Code/ID (ABC-123, XYZ_456)
- En: English text
- NE: Number + English mix
- Sg: Single character
- Nr: Person name
- Tx: Short text (3-12 tokens)
- Lx: Long text (>12 tokens)
- Ot: Other
"""
patt = [
# Date patterns
("^(20|19)[0-9]{2}[年/-][0-9]{1,2}[月/-][0-9]{1,2}日*$", "Dt"),
(r"^(20|19)[0-9]{2}年$", "Dt"),
(r"^(20|19)[0-9]{2}[年-][0-9]{1,2}月*$", "Dt"),
("^[0-9]{1,2}[月-][0-9]{1,2}日*$", "Dt"),
(r"^第*[一二三四1-4]季度$", "Dt"),
(r"^(20|19)[0-9]{2}年*[一二三四1-4]季度$", "Dt"),
(r"^(20|19)[0-9]{2}[ABCDE]$", "Dt"),
# Number patterns
("^[0-9.,+%/ -]+$", "Nu"),
# Code patterns
(r"^[0-9A-Z/\._~-]+$", "Ca"),
# English text
(r"^[A-Z]*[a-z' -]+$", "En"),
# Number + English mix
(r"^[0-9.,+-]+[0-9A-Za-z/$¥%<>()' -]+$", "NE"),
# Single character
(r"^.{1}$", "Sg"),
]
for p, n in patt:
if re.search(p, b["text"].strip()):
return n
# Tokenize and classify
tks = [t for t in rag_tokenizer.tokenize(b["text"]).split() if len(t) > 1]
if len(tks) > 3:
return "Tx" if len(tks) < 12 else "Lx"
if len(tks) == 1 and rag_tokenizer.tag(tks[0]) == "nr":
return "Nr"
return "Ot"
```
### HTML Output
```python
@staticmethod
def _html_table(cap, hdset, tbl):
"""
Convert table to HTML format.
Features:
- Caption support
- Header rows (<th>)
- Colspan/rowspan attributes
"""
html = "<table>"
if cap:
html += f"<caption>{cap}</caption>"
for i in range(len(tbl)):
row = "<tr>"
txts = []
for j, arr in enumerate(tbl[i]):
if arr is None: # Spanned cell
continue
if not arr:
row += "<td></td>" if i not in hdset else "<th></th>"
continue
# Get cell text
h = min(np.min([c["bottom"] - c["top"] for c in arr]) / 2, 10)
txt = " ".join([c["text"] for c in
Recognizer.sort_Y_firstly(arr, h)])
txts.append(txt)
# Build span attributes
sp = ""
if arr[0].get("colspan"):
sp = f"colspan={arr[0]['colspan']}"
if arr[0].get("rowspan"):
sp += f" rowspan={arr[0]['rowspan']}"
# Add cell
if i in hdset:
row += f"<th {sp}>{txt}</th>"
else:
row += f"<td {sp}>{txt}</td>"
if row != "<tr>":
row += "</tr>"
html += "\n" + row
html += "\n</table>"
return html
```
### Natural Language Output
```python
@staticmethod
def _desc_table(cap, hdr_rowno, tbl, is_english):
"""
Convert table to natural language format.
Output format:
"Header1: Value1; Header2: Value2 ——from 'Table Caption'"
This format is better for:
- RAG retrieval
- LLM understanding
- Semantic search
"""
clmno = len(tbl[0])
rowno = len(tbl)
# Build headers dictionary
headers = {}
for r in sorted(list(hdr_rowno)):
headers[r] = ["" for _ in range(clmno)]
for i in range(clmno):
if tbl[r][i]:
txt = " ".join([a["text"].strip() for a in tbl[r][i]])
headers[r][i] = txt
# Merge hierarchical headers
de = "的" if not is_english else " for "
# ... header merging logic
# Generate row descriptions
row_txt = []
for i in range(rowno):
if i in hdr_rowno:
continue
rtxt = []
# Find nearest header row
r = 0
if headers:
_arr = [(i - r, r) for r, _ in headers.items() if r < i]
if _arr:
_, r = min(_arr, key=lambda x: x[0])
# Build row text with headers
for j in range(clmno):
if not tbl[i][j]:
continue
txt = "".join([a["text"].strip() for a in tbl[i][j]])
if not txt:
continue
ctt = headers[r][j] if r in headers else ""
if ctt:
ctt += ""
ctt += txt
if ctt:
rtxt.append(ctt)
if rtxt:
row_txt.append("; ".join(rtxt))
# Add caption attribution
if cap:
from_ = " in " if is_english else "来自"
row_txt = [t + f"\t——{from_}"{cap}"" for t in row_txt]
return row_txt
```
### Span Calculation
```python
@staticmethod
def _cal_spans(boxes, rows, cols, tbl, html=True):
"""
Calculate colspan and rowspan for merged cells.
Process:
1. Find boxes marked as spanning cells
2. Calculate which rows/columns they span
3. Mark spanned cells as None (for HTML) or merge content
"""
# Calculate row/column boundaries
clft = [np.mean([c.get("C_left", c["x0"]) for c in cln]) for cln in cols]
crgt = [np.mean([c.get("C_right", c["x1"]) for c in cln]) for cln in cols]
rtop = [np.mean([c.get("R_top", c["top"]) for c in row]) for row in rows]
rbtm = [np.mean([c.get("R_btm", c["bottom"]) for c in row]) for row in rows]
for b in boxes:
if "SP" not in b: # Not a spanning cell
continue
b["colspan"] = [b["cn"]]
b["rowspan"] = [b["rn"]]
# Find spanned columns
for j in range(len(clft)):
if j == b["cn"]:
continue
if clft[j] + (crgt[j] - clft[j]) / 2 < b["H_left"]:
continue
if crgt[j] - (crgt[j] - clft[j]) / 2 > b["H_right"]:
continue
b["colspan"].append(j)
# Find spanned rows
for j in range(len(rtop)):
if j == b["rn"]:
continue
if rtop[j] + (rbtm[j] - rtop[j]) / 2 < b["H_top"]:
continue
if rbtm[j] - (rbtm[j] - rtop[j]) / 2 > b["H_bott"]:
continue
b["rowspan"].append(j)
# Update table with spans
# ... merge spanned cells, mark as None for HTML
return tbl
```
## Ascend NPU Support
```python
def _run_ascend_tsr(self, image_list, thr=0.2, batch_size=16):
"""
Run TSR on Huawei Ascend NPU.
Uses .om model format and ais_bench for inference.
"""
from ais_bench.infer.interface import InferSession
model_file_path = os.path.join(model_dir, "tsr.om")
device_id = int(os.getenv("ASCEND_LAYOUT_RECOGNIZER_DEVICE_ID", 0))
session = InferSession(device_id=device_id, model_path=model_file_path)
results = []
for batch_images in batched(image_list, batch_size):
inputs_list = self.preprocess(batch_images)
for ins in inputs_list:
output_list = session.infer(feeds=[ins["image"]], mode="static")
bb = self.postprocess(output_list, ins, thr)
results.append(bb)
return results
```
## Configuration
```python
# Model selection
TABLE_STRUCTURE_RECOGNIZER_TYPE = "onnx" # onnx, ascend
# Detection parameters
TSR_PARAMS = {
"threshold": 0.2, # Confidence threshold
"batch_size": 16, # Inference batch size
}
# Output format
TABLE_OUTPUT = {
"html": True, # HTML format (default)
"desc": False, # Natural language descriptions
}
```
## Integration with PDF Parser
```python
# In pdf_parser.py
def _table_transformer_job(self, zoomin):
"""
Run TSR on detected table regions.
Process:
1. Find all boxes with layout_type == "table"
2. Crop table regions from page images
3. Run TSR to get structure
4. Map OCR boxes to cells
"""
self.tsr = TableStructureRecognizer()
# Group tables by page
table_boxes = [b for b in self.boxes if b.get("layout_type") == "table"]
for tb in table_boxes:
# Crop table image
page_img = self.page_images[tb["page_number"]]
table_img = page_img.crop((
tb["x0"] * zoomin,
tb["top"] * zoomin,
tb["x1"] * zoomin,
tb["bottom"] * zoomin
))
# Run TSR
structure = self.tsr([np.array(table_img)])[0]
# Map structure to OCR boxes
self._map_structure_to_boxes(tb, structure)
```
## Related Files
- `/deepdoc/vision/table_structure_recognizer.py` - TSR implementation
- `/deepdoc/vision/recognizer.py` - Base recognizer class
- `/rag/res/deepdoc/tsr.onnx` - TSR ONNX model
- `/deepdoc/parser/pdf_parser.py` - PDF parser integration

View file

@ -0,0 +1,420 @@
# Task Executor Analysis
## Tong Quan
Task executor là main orchestration engine xử lý documents asynchronously với queue-based processing.
## File Location
```
/rag/svr/task_executor.py
```
## Architecture
```
┌─────────────────────────────────────────────────────────────────┐
│ TASK EXECUTOR ARCHITECTURE │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ Main Event Loop (trio) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ report_status() - Heartbeat (30s interval) │ │
│ │ - Update server status │ │
│ │ - Cleanup stale tasks │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Task Manager Loop │ │
│ │ ├── collect() - Get task from Redis queue │ │
│ │ ├── do_handle_task() - Process with semaphore │ │
│ │ │ ├── build_chunks() │ │
│ │ │ ├── embedding() │ │
│ │ │ └── insert_es() │ │
│ │ └── handle_task() - ACK and error handling │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
## Main Entry Point
```python
async def main():
"""Main entry point for task executor."""
# Initialize connections
init_db_connection()
init_es_connection()
init_minio_connection()
# Start concurrent tasks
async with trio.open_nursery() as nursery:
# Heartbeat reporter
nursery.start_soon(report_status)
# Task processing loop
nursery.start_soon(task_loop)
async def task_loop():
"""Main task processing loop."""
while True:
try:
# Get task from queue
task = await collect()
if task:
# Process with concurrency limit
async with semaphore:
await do_handle_task(task)
except Exception as e:
logging.exception(e)
await trio.sleep(1)
```
## Task Collection
```python
async def collect():
"""
Collect task from Redis queue.
Returns:
Task dict or None if no tasks available
"""
# Try to get from queue
result = REDIS_CONN.queue_consume(
queue_name=get_queue_name(),
consumer_group=SVR_CONSUMER_GROUP_NAME,
block=5000 # 5 second timeout
)
if not result:
return None
# Parse task
message_id, task_data = result
task = json.loads(task_data["task"])
# Get full task context
task_info = TaskService.get_task(task["id"])
if not task_info:
# Task canceled or max retries exceeded
REDIS_CONN.queue_ack(queue_name, message_id)
return None
task_info["message_id"] = message_id
return task_info
```
## Task Handling
```python
async def do_handle_task(task):
"""
Main task processing logic.
Steps:
1. Download file from MinIO
2. Build chunks (parse + chunk + enrich)
3. Generate embeddings
4. Index in Elasticsearch
"""
doc_id = task["doc_id"]
task_id = task["id"]
try:
# Update progress: Starting
TaskService.update_progress(task_id, {
"progress": 0.1,
"progress_msg": "Starting document processing..."
})
# 1. Download file
file_blob = await download_from_minio(task)
# 2. Build chunks
chunks = await build_chunks(task, file_blob)
if not chunks:
TaskService.update_progress(task_id, {
"progress": -1,
"progress_msg": "No content extracted"
})
return
# 3. Generate embeddings
chunks = await embedding(chunks, task)
# 4. Index in Elasticsearch
await insert_es(chunks, task)
# 5. Update success
TaskService.update_progress(task_id, {
"progress": 1.0,
"progress_msg": f"Completed. {len(chunks)} chunks created.",
"chunk_ids": " ".join([c["id"] for c in chunks])
})
except Exception as e:
logging.exception(e)
TaskService.update_progress(task_id, {
"progress": -1,
"progress_msg": str(e)
})
async def handle_task(task, result):
"""
Post-processing: ACK queue and cleanup.
"""
REDIS_CONN.queue_ack(
get_queue_name(),
task["message_id"]
)
```
## Chunk Building
```python
async def build_chunks(task, file_blob):
"""
Build chunks from document.
Process:
1. Select parser based on file type
2. Parse document
3. Chunk content
4. Enrich chunks (keywords, questions)
"""
file_name = task["name"]
parser_id = task["parser_id"]
parser_config = task["parser_config"]
# Select parser
if file_name.endswith(".pdf"):
if parser_config.get("layout_recognize") == "DeepDOC":
parser = RAGFlowPdfParser()
elif parser_config.get("layout_recognize") == "Plain":
parser = PlainParser()
else:
parser = VisionParser()
elif file_name.endswith(".docx"):
parser = DocxParser()
elif file_name.endswith(".xlsx"):
parser = ExcelParser()
else:
parser = TextParser()
# Parse document
sections = parser.parse(
file_blob,
from_page=task.get("from_page", 0),
to_page=task.get("to_page", -1),
callback=lambda p, m: TaskService.update_progress(task["id"], {
"progress": p,
"progress_msg": m
})
)
# Chunk content
chunks = naive_merge(
sections,
chunk_token_num=parser_config.get("chunk_token_num", 512),
delimiter=parser_config.get("delimiter", "\n。"),
overlapped_percent=parser_config.get("overlapped_percent", 0)
)
# Build chunk records
chunk_records = []
for i, (content, positions) in enumerate(chunks):
chunk_id = xxhash.xxh64(content + task["doc_id"]).hexdigest()
chunk_records.append({
"id": chunk_id,
"doc_id": task["doc_id"],
"kb_id": task["kb_id"],
"content_with_weight": content,
"docnm_kwd": task["name"],
"page_num_int": extract_page_nums(positions),
"position_int": encode_positions(positions),
"create_time": datetime.now().isoformat(),
})
# Enrich chunks
if parser_config.get("auto_keywords"):
await add_keywords(chunk_records, task)
if parser_config.get("auto_questions"):
await add_questions(chunk_records, task)
return chunk_records
```
## Embedding Generation
```python
async def embedding(chunks, task):
"""
Generate embeddings for chunks.
"""
embd_mdl = LLMBundle(
task["tenant_id"],
LLMType.EMBEDDING,
task.get("embd_id")
)
batch_size = 16
total_tokens = 0
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i+batch_size]
# Prepare texts
texts = [c["content_with_weight"] for c in batch]
# Generate embeddings
embeddings, tokens = embd_mdl.encode(texts)
total_tokens += tokens
# Store vectors
for j, emb in enumerate(embeddings):
chunk_idx = i + j
vec_field = f"q_{len(emb)}_vec"
chunks[chunk_idx][vec_field] = emb.tolist()
# Update progress
progress = 0.7 + 0.2 * (i / len(chunks))
TaskService.update_progress(task["id"], {
"progress": progress,
"progress_msg": f"Embedding {i+len(batch)}/{len(chunks)} chunks"
})
return chunks
```
## Elasticsearch Indexing
```python
async def insert_es(chunks, task):
"""
Bulk insert chunks to Elasticsearch.
"""
es = get_es_connection()
index_name = f"ragflow_{task['kb_id']}"
# Ensure index exists
if not es.indices.exists(index=index_name):
es.indices.create(index=index_name, body=ES_MAPPING)
# Bulk insert
bulk_size = 64
for i in range(0, len(chunks), bulk_size):
batch = chunks[i:i+bulk_size]
actions = []
for chunk in batch:
actions.append({
"_index": index_name,
"_id": chunk["id"],
"_source": chunk
})
helpers.bulk(es, actions)
# Update progress
progress = 0.9 + 0.1 * (i / len(chunks))
TaskService.update_progress(task["id"], {
"progress": progress,
"progress_msg": f"Indexing {i+len(batch)}/{len(chunks)} chunks"
})
```
## Concurrency Control
```python
# Global semaphores
task_semaphore = trio.Semaphore(MAX_CONCURRENT_TASKS) # 5
chunk_semaphore = trio.Semaphore(MAX_CONCURRENT_CHUNK_BUILDERS) # 1
minio_semaphore = trio.Semaphore(MAX_CONCURRENT_MINIO) # 10
async def do_handle_task(task):
async with task_semaphore:
# ... processing
async def build_chunks(task, blob):
async with chunk_semaphore:
# ... chunk building
async def download_from_minio(task):
async with minio_semaphore:
# ... download
```
## Progress Tracking
```python
# Progress stages:
# 0.0 - 0.1: Starting
# 0.1 - 0.4: Image extraction (PDF)
# 0.4 - 0.6: OCR
# 0.6 - 0.7: Layout + text merge
# 0.7 - 0.9: Embedding
# 0.9 - 1.0: Indexing
def update_progress(task_id, info):
"""
Thread-safe progress update.
Rules:
- progress_msg: Always append
- progress: Only update if new > current (or -1 for failure)
"""
# ... implementation
```
## Task Types
```python
TASK_TYPES = {
"": "standard", # Standard document parsing
"graphrag": "graphrag", # Knowledge graph extraction
"raptor": "raptor", # RAPTOR tree building
"mindmap": "mindmap", # Mind map generation
"dataflow": "dataflow", # Custom pipeline
}
async def do_handle_task(task):
task_type = task.get("task_type", "")
if task_type == "graphrag":
await handle_graphrag_task(task)
elif task_type == "raptor":
await handle_raptor_task(task)
else:
await handle_standard_task(task)
```
## Configuration
```python
# Environment variables
MAX_CONCURRENT_TASKS = int(os.environ.get("MAX_CONCURRENT_TASKS", 5))
MAX_CONCURRENT_CHUNK_BUILDERS = int(os.environ.get("MAX_CONCURRENT_CHUNK_BUILDERS", 1))
MAX_CONCURRENT_MINIO = int(os.environ.get("MAX_CONCURRENT_MINIO", 10))
DOC_MAXIMUM_SIZE = 100 * 1024 * 1024 # 100MB
DOC_BULK_SIZE = 64
EMBEDDING_BATCH_SIZE = 16
```
## Related Files
- `/rag/svr/task_executor.py` - Main executor
- `/api/db/services/task_service.py` - Task management
- `/rag/app/naive.py` - Document parsing
- `/rag/nlp/__init__.py` - Chunking