Feat: Add ProgressTracker and vlm-direct backend for MinerU integration

- Add ProgressTracker class for real-time progress parsing from mineru stdout
- Implement intelligent stall-based timeout (5min no progress triggers warning)
- Add vlm-direct backend mode to directly connect to vLLM server
- Improve API mode with streaming response, HTTP session reuse, and auto-retry
- Add heartbeat updates during long-running operations
- Support dynamic timeout based on page count

This addresses the deadlock issue where tasks hang indefinitely when using
MinerU API backend, even after the API returns 200 OK.
This commit is contained in:
少卿 2025-12-05 15:44:31 +08:00
parent 885eb2eab9
commit 83243ecb5a
2 changed files with 407 additions and 34 deletions

View file

@ -33,6 +33,8 @@ from typing import Any, Callable, Optional
import numpy as np
import pdfplumber
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from PIL import Image
from strenum import StrEnum
@ -43,6 +45,120 @@ if LOCK_KEY_pdfplumber not in sys.modules:
sys.modules[LOCK_KEY_pdfplumber] = threading.Lock()
class ProgressTracker:
"""
Progress tracker: parse mineru output for progress and implement intelligent stall-based timeout.
Features:
1. Parse tqdm progress bar output (e.g., "100%|████| 659/659")
2. Intelligent stall-based timeout (reset timer when progress updates)
3. Report progress to RagFlow callback
Design principles:
- Standalone helper class, does not modify MinerUParser core logic
- Parsing failure does not affect functionality, just no progress display
- Compatible with upstream code changes
"""
# Match tqdm progress bar format: "100%|████████████| 659/659" or "Page 45/659"
TQDM_PATTERN = re.compile(r'(\d+)%\|[^\|]*\|\s*(\d+)/(\d+)')
PAGE_PATTERN = re.compile(r'[Pp]age\s*(\d+)\s*/\s*(\d+)')
STEP_PATTERN = re.compile(r'(\d+)/(\d+)\s*\[') # "45/659 [00:30<01:00"
def __init__(
self,
callback: Optional[Callable] = None,
stall_timeout: int = 300, # 5 minutes without progress triggers timeout
progress_base: float = 0.25, # Base progress value
progress_range: float = 0.45, # Progress range (0.25 ~ 0.70)
):
self.callback = callback
self.stall_timeout = stall_timeout
self.progress_base = progress_base
self.progress_range = progress_range
self.last_progress_time = time.time()
self.last_progress_value = 0.0
self.total_pages = 0
self.current_page = 0
self.logger = logging.getLogger(self.__class__.__name__)
def parse_line(self, line: str) -> Optional[float]:
"""
Parse a line of output and extract progress information.
Returns progress value 0.0 ~ 1.0, or None if cannot parse.
"""
line = line.strip()
if not line:
return None
# Try to match tqdm format: "100%|████| 659/659"
if match := self.TQDM_PATTERN.search(line):
percent = int(match.group(1))
current = int(match.group(2))
total = int(match.group(3))
self.current_page = current
self.total_pages = total
return percent / 100.0
# Try to match "Page 45/659" format
if match := self.PAGE_PATTERN.search(line):
current = int(match.group(1))
total = int(match.group(2))
self.current_page = current
self.total_pages = total
return current / total if total > 0 else None
# Try to match "45/659 [" format
if match := self.STEP_PATTERN.search(line):
current = int(match.group(1))
total = int(match.group(2))
self.current_page = current
self.total_pages = total
return current / total if total > 0 else None
return None
def on_output(self, line: str, prefix: str = "STDOUT") -> None:
"""
Process a line of output. Parse progress and update state.
"""
progress = self.parse_line(line)
if progress is not None and progress > self.last_progress_value:
self.last_progress_value = progress
self.last_progress_time = time.time() # Reset timeout timer!
# Calculate actual progress value (map to progress_base ~ progress_base+progress_range)
actual_progress = self.progress_base + (progress * self.progress_range)
if self.callback:
msg = f"[MinerU] Processing page {self.current_page}/{self.total_pages}" if self.total_pages else f"[MinerU] Progress: {progress:.0%}"
try:
self.callback(actual_progress, msg)
except Exception as e:
self.logger.warning(f"[MinerU] Progress callback failed: {e}")
def is_stalled(self) -> bool:
"""Check if stalled (no progress update for a long time)"""
return (time.time() - self.last_progress_time) > self.stall_timeout
def get_stall_info(self) -> str:
"""Get stall information for error reporting"""
stall_duration = int(time.time() - self.last_progress_time)
return (
f"No progress for {stall_duration}s "
f"(stalled at {self.last_progress_value:.0%}, "
f"page {self.current_page}/{self.total_pages or '?'})"
)
def reset(self) -> None:
"""Reset state (for retry)"""
self.last_progress_time = time.time()
self.last_progress_value = 0.0
self.current_page = 0
class MinerUContentType(StrEnum):
IMAGE = "image"
TABLE = "table"
@ -54,13 +170,30 @@ class MinerUContentType(StrEnum):
class MinerUParser(RAGFlowPdfParser):
"""MinerU PDF Parser with multiple backend support.
Backends:
- pipeline: Local MinerU with traditional pipeline (requires GPU)
- vlm-http-client: MinerU CLI calling external vLLM server
- vlm-direct: NEW! Direct vLLM API call from RagFlow (bypasses mineru-api)
- api: External mineru-api service (legacy, prone to timeout issues)
"""
# Timeout settings for different operations
API_CONNECT_TIMEOUT = 30 # seconds to establish connection
API_READ_TIMEOUT_PER_PAGE = 60 # seconds per page for processing
API_MAX_TIMEOUT = 1800 # maximum total timeout
HEARTBEAT_INTERVAL = 30 # seconds between progress updates
def __init__(self, mineru_path: str = "mineru", mineru_api: str = "http://host.docker.internal:9987", mineru_server_url: str = ""):
self.mineru_path = Path(mineru_path)
self.mineru_api = mineru_api.rstrip("/")
self.mineru_server_url = mineru_server_url.rstrip("/")
self.using_api = False
self.using_vlm_direct = False # New flag for direct vLLM mode
self.outlines = []
self.logger = logging.getLogger(self.__class__.__name__)
self._session = None # Reusable HTTP session
def _extract_zip_no_root(self, zip_path, extract_to, root_dir):
self.logger.info(f"[MinerU] Extract zip: zip_path={zip_path}, extract_to={extract_to}, root_hint={root_dir}")
@ -96,9 +229,25 @@ class MinerUParser(RAGFlowPdfParser):
with open(full_path, "wb") as f:
f.write(zip_ref.read(filename))
def _get_http_session(self) -> requests.Session:
"""Get or create a reusable HTTP session with retry logic."""
if self._session is None:
self._session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "POST"],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self._session.mount("http://", adapter)
self._session.mount("https://", adapter)
return self._session
def _is_http_endpoint_valid(self, url, timeout=5):
try:
response = requests.head(url, timeout=timeout, allow_redirects=True)
session = self._get_http_session()
response = session.head(url, timeout=timeout, allow_redirects=True)
return response.status_code in [200, 301, 302, 307, 308]
except Exception:
return False
@ -106,9 +255,9 @@ class MinerUParser(RAGFlowPdfParser):
def check_installation(self, backend: str = "pipeline", server_url: Optional[str] = None) -> tuple[bool, str]:
reason = ""
valid_backends = ["pipeline", "vlm-http-client", "vlm-transformers", "vlm-vllm-engine"]
valid_backends = ["pipeline", "vlm-http-client", "vlm-transformers", "vlm-vllm-engine", "vlm-direct"]
if backend not in valid_backends:
reason = "[MinerU] Invalid backend '{backend}'. Valid backends are: {valid_backends}"
reason = f"[MinerU] Invalid backend '{backend}'. Valid backends are: {valid_backends}"
self.logger.warning(reason)
return False, reason
@ -126,6 +275,31 @@ class MinerUParser(RAGFlowPdfParser):
if server_url is None:
server_url = self.mineru_server_url
# NEW: vlm-direct mode - direct connection to vLLM server, bypassing mineru-api
if backend == "vlm-direct" and server_url:
try:
# Check if vLLM server is accessible (OpenAI-compatible API)
health_endpoints = ["/health", "/v1/models", "/openapi.json"]
server_accessible = False
for endpoint in health_endpoints:
if self._is_http_endpoint_valid(server_url.rstrip("/") + endpoint):
server_accessible = True
break
if server_accessible:
self.logger.info(f"[MinerU] vlm-direct server accessible: {server_url}")
self.using_api = False
self.using_vlm_direct = True
return True, reason
else:
reason = f"[MinerU] vlm-direct server not accessible: {server_url}"
self.logger.warning(reason)
return False, reason
except Exception as e:
reason = f"[MinerU] vlm-direct server check failed: {server_url}: {e}"
self.logger.warning(reason)
return False, reason
if backend == "vlm-http-client" and server_url:
try:
server_accessible = self._is_http_endpoint_valid(server_url + "/openapi.json")
@ -185,7 +359,9 @@ class MinerUParser(RAGFlowPdfParser):
def _run_mineru(
self, input_path: Path, output_dir: Path, method: str = "auto", backend: str = "pipeline", lang: Optional[str] = None, server_url: Optional[str] = None, callback: Optional[Callable] = None
):
if self.using_api:
if self.using_vlm_direct:
self._run_mineru_vlm_direct(input_path, output_dir, server_url, callback)
elif self.using_api:
self._run_mineru_api(input_path, output_dir, method, backend, lang, callback)
else:
self._run_mineru_executable(input_path, output_dir, method, backend, lang, server_url, callback)
@ -223,32 +399,195 @@ class MinerUParser(RAGFlowPdfParser):
}
headers = {"Accept": "application/json"}
# Calculate dynamic timeout based on expected page count
# Estimate ~60s per page, with minimum 5 minutes
estimated_pages = data.get("end_page_id", 100) - data.get("start_page_id", 0)
dynamic_timeout = max(300, min(estimated_pages * self.API_READ_TIMEOUT_PER_PAGE, self.API_MAX_TIMEOUT))
self.logger.info(f"[MinerU] invoke api: {self.mineru_api}/file_parse (timeout={dynamic_timeout}s)")
if callback:
callback(0.20, f"[MinerU] invoke api: {self.mineru_api}/file_parse")
# Use session with retry logic and streaming response
session = self._get_http_session()
last_heartbeat = time.time()
try:
self.logger.info(f"[MinerU] invoke api: {self.mineru_api}/file_parse")
if callback:
callback(0.20, f"[MinerU] invoke api: {self.mineru_api}/file_parse")
response = requests.post(url=f"{self.mineru_api}/file_parse", files=files, data=data, headers=headers, timeout=1800)
response.raise_for_status()
if response.headers.get("Content-Type") == "application/zip":
self.logger.info(f"[MinerU] zip file returned, saving to {output_zip_path}...")
if callback:
callback(0.30, f"[MinerU] zip file returned, saving to {output_zip_path}...")
with open(output_zip_path, "wb") as f:
f.write(response.content)
self.logger.info(f"[MinerU] Unzip to {output_path}...")
self._extract_zip_no_root(output_zip_path, output_path, pdf_file_name + "/")
if callback:
callback(0.40, f"[MinerU] Unzip to {output_path}...")
else:
self.logger.warning("[MinerU] not zip returned from api%s " % response.headers.get("Content-Type"))
# Stream response to allow progress monitoring
with session.post(
url=f"{self.mineru_api}/file_parse",
files=files,
data=data,
headers=headers,
timeout=(self.API_CONNECT_TIMEOUT, dynamic_timeout),
stream=True
) as response:
response.raise_for_status()
if response.headers.get("Content-Type") == "application/zip":
self.logger.info(f"[MinerU] zip file returned, saving to {output_zip_path}...")
if callback:
callback(0.30, f"[MinerU] zip file returned, saving to {output_zip_path}...")
# Stream write with progress updates
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
with open(output_zip_path, "wb") as f:
for chunk in response.iter_content(chunk_size=1024 * 1024): # 1MB chunks
if chunk:
f.write(chunk)
downloaded += len(chunk)
# Send heartbeat every 30 seconds during download
now = time.time()
if callback and now - last_heartbeat >= self.HEARTBEAT_INTERVAL:
progress = 0.30 + (0.10 * downloaded / total_size) if total_size else 0.35
callback(progress, f"[MinerU] Downloading... {downloaded // (1024*1024)}MB")
last_heartbeat = now
self.logger.info(f"[MinerU] Unzip to {output_path}...")
self._extract_zip_no_root(output_zip_path, output_path, pdf_file_name + "/")
if callback:
callback(0.40, f"[MinerU] Unzip to {output_path}...")
else:
self.logger.warning("[MinerU] not zip returned from api: %s" % response.headers.get("Content-Type"))
except requests.exceptions.Timeout as e:
raise RuntimeError(f"[MinerU] API timeout after {dynamic_timeout}s. Consider using vlm-direct mode. Error: {e}")
except requests.exceptions.ConnectionError as e:
raise RuntimeError(f"[MinerU] API connection error. Is mineru-api running? Error: {e}")
except Exception as e:
raise RuntimeError(f"[MinerU] api failed with exception {e}")
self.logger.info("[MinerU] Api completed successfully.")
raise RuntimeError(f"[MinerU] API failed with exception: {e}")
self.logger.info("[MinerU] API completed successfully.")
def _run_mineru_vlm_direct(
self, input_path: Path, output_dir: Path, server_url: Optional[str] = None, callback: Optional[Callable] = None
):
"""
Direct vLLM integration mode - bypasses mineru-api and calls vLLM server directly.
This method uses mineru's internal vlm-http-client backend to connect directly to
a vLLM server, eliminating the mineru-api middleware that can cause deadlocks.
Requires: mineru[core] package installed
"""
if not server_url:
server_url = self.mineru_server_url
if not server_url:
raise RuntimeError("[MinerU] vlm-direct mode requires MINERU_SERVER_URL to be set")
self.logger.info(f"[MinerU] Starting vlm-direct mode with server: {server_url}")
if callback:
callback(0.15, f"[MinerU] Connecting to vLLM server: {server_url}")
try:
# Try to import mineru's vlm_analyze module
from mineru.backend.vlm.vlm_analyze import doc_analyze
from mineru.data.data_reader_writer import FileBasedDataWriter
except ImportError as e:
self.logger.warning(f"[MinerU] mineru package not available, falling back to executable mode: {e}")
# Fallback to executable mode with vlm-http-client backend
self._run_mineru_executable(
input_path, output_dir,
method="auto",
backend="vlm-http-client",
server_url=server_url,
callback=callback
)
return
pdf_file_name = input_path.stem.replace(" ", "")
local_md_dir = output_dir / pdf_file_name / "vlm"
local_image_dir = local_md_dir / "images"
local_md_dir.mkdir(parents=True, exist_ok=True)
local_image_dir.mkdir(parents=True, exist_ok=True)
self.logger.info(f"[MinerU] Output directory: {local_md_dir}")
if callback:
callback(0.20, f"[MinerU] Output directory: {local_md_dir}")
# Read PDF bytes
with open(input_path, "rb") as f:
pdf_bytes = f.read()
# Create image writer for mineru
image_writer = FileBasedDataWriter(str(local_image_dir))
try:
if callback:
callback(0.25, "[MinerU] Starting VLM processing (direct mode)...")
# Call mineru's doc_analyze with http-client backend
middle_json, results = doc_analyze(
pdf_bytes=pdf_bytes,
image_writer=image_writer,
backend="http-client",
server_url=server_url,
max_concurrency=10,
http_timeout=self.API_READ_TIMEOUT_PER_PAGE,
max_retries=3,
retry_backoff_factor=0.5,
)
if callback:
callback(0.60, "[MinerU] VLM processing completed, writing output...")
# Write content_list.json (required by _read_output)
content_list = self._convert_middle_json_to_content_list(middle_json)
content_list_path = local_md_dir / f"{pdf_file_name}_content_list.json"
with open(content_list_path, "w", encoding="utf-8") as f:
json.dump(content_list, f, ensure_ascii=False, indent=2)
self.logger.info(f"[MinerU] vlm-direct completed, output: {content_list_path}")
if callback:
callback(0.70, f"[MinerU] vlm-direct completed successfully")
except Exception as e:
self.logger.error(f"[MinerU] vlm-direct failed: {e}")
raise RuntimeError(f"[MinerU] vlm-direct mode failed: {e}")
def _convert_middle_json_to_content_list(self, middle_json: dict) -> list:
"""Convert mineru's middle_json format to content_list format."""
content_list = []
pdf_info = middle_json.get("pdf_info", [])
for page_idx, page_info in enumerate(pdf_info):
preproc_blocks = page_info.get("preproc_blocks", [])
for block in preproc_blocks:
block_type = block.get("type", "text")
item = {
"type": block_type,
"page_idx": page_idx,
"bbox": block.get("bbox", [0, 0, 0, 0]),
}
if block_type == "text":
lines = block.get("lines", [])
text = " ".join(
span.get("content", "")
for line in lines
for span in line.get("spans", [])
)
item["text"] = text
elif block_type == "table":
item["table_body"] = block.get("html", "")
item["table_caption"] = block.get("caption", [])
item["table_footnote"] = block.get("footnote", [])
elif block_type == "image":
item["image_caption"] = block.get("caption", [])
item["image_footnote"] = block.get("footnote", [])
item["img_path"] = block.get("img_path", "")
elif block_type == "equation":
item["text"] = block.get("latex", "")
content_list.append(item)
return content_list
def _run_mineru_executable(
self, input_path: Path, output_dir: Path, method: str = "auto", backend: str = "pipeline", lang: Optional[str] = None, server_url: Optional[str] = None, callback: Optional[Callable] = None
@ -258,10 +597,12 @@ class MinerUParser(RAGFlowPdfParser):
cmd.extend(["-b", backend])
if lang:
cmd.extend(["-l", lang])
if server_url and backend == "vlm-http-client":
if server_url and backend in ("vlm-http-client", "vlm-direct"):
cmd.extend(["-u", server_url])
self.logger.info(f"[MinerU] Running command: {' '.join(cmd)}")
if callback:
callback(0.20, f"[MinerU] Starting {backend} backend...")
subprocess_kwargs = {
"stdout": subprocess.PIPE,
@ -277,6 +618,14 @@ class MinerUParser(RAGFlowPdfParser):
process = subprocess.Popen(cmd, **subprocess_kwargs)
stdout_queue, stderr_queue = Queue(), Queue()
# Create progress tracker (intelligent timeout + progress parsing)
tracker = ProgressTracker(
callback=callback,
stall_timeout=300, # 5 minutes without progress triggers warning
progress_base=0.25,
progress_range=0.45,
)
def enqueue_output(pipe, queue, prefix):
for line in iter(pipe.readline, ""):
@ -294,10 +643,24 @@ class MinerUParser(RAGFlowPdfParser):
prefix, line = q.get_nowait()
if prefix == "STDOUT":
self.logger.info(f"[MinerU] {line}")
# Parse progress and update callback
tracker.on_output(line, prefix)
else:
self.logger.warning(f"[MinerU] {line}")
except Empty:
pass
# Check if stalled (progress-based intelligent timeout)
if tracker.is_stalled():
stall_info = tracker.get_stall_info()
self.logger.warning(f"[MinerU] Process appears stalled: {stall_info}")
if callback:
callback(tracker.progress_base + tracker.last_progress_value * tracker.progress_range,
f"[MinerU] Warning: {stall_info}")
# Note: only warning here, not forcibly terminating the process
# because some PDFs do take a long time to process certain pages
tracker.last_progress_time = time.time() # Reset to avoid repeated warnings
time.sleep(0.1)
return_code = process.wait()

View file

@ -60,11 +60,21 @@ def by_deepdoc(filename, binary=None, from_page=0, to_page=100000, lang="Chinese
def by_mineru(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", callback=None, pdf_cls = None ,**kwargs):
mineru_executable = os.environ.get("MINERU_EXECUTABLE", "mineru")
mineru_api = os.environ.get("MINERU_APISERVER", "http://host.docker.internal:9987")
pdf_parser = MinerUParser(mineru_path=mineru_executable, mineru_api=mineru_api)
mineru_server_url = os.environ.get("MINERU_SERVER_URL", "")
mineru_backend = os.environ.get("MINERU_BACKEND", "pipeline")
parse_method = kwargs.get("parse_method", "raw")
# Initialize parser with all configuration
pdf_parser = MinerUParser(
mineru_path=mineru_executable,
mineru_api=mineru_api,
mineru_server_url=mineru_server_url
)
if not pdf_parser.check_installation():
callback(-1, "MinerU not found.")
# Check installation with the specified backend
ok, reason = pdf_parser.check_installation(backend=mineru_backend, server_url=mineru_server_url)
if not ok:
callback(-1, f"MinerU not found or backend unavailable: {reason}")
return None, None, pdf_parser
sections, tables = pdf_parser.parse_pdf(
@ -72,8 +82,8 @@ def by_mineru(filename, binary=None, from_page=0, to_page=100000, lang="Chinese"
binary=binary,
callback=callback,
output_dir=os.environ.get("MINERU_OUTPUT_DIR", ""),
backend=os.environ.get("MINERU_BACKEND", "pipeline"),
server_url=os.environ.get("MINERU_SERVER_URL", ""),
backend=mineru_backend,
server_url=mineru_server_url,
delete_output=bool(int(os.environ.get("MINERU_DELETE_OUTPUT", 1))),
parse_method=parse_method
)