612 lines
25 KiB
Python
612 lines
25 KiB
Python
"""Docling serve manager for local document processing service."""
|
|
|
|
import asyncio
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
from typing import Optional, Tuple, Dict, Any, List, AsyncIterator
|
|
from utils.logging_config import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
|
|
class DoclingManager:
|
|
"""Manages local docling serve instance as external process."""
|
|
|
|
_instance = None
|
|
_initialized = False
|
|
|
|
def __new__(cls):
|
|
if cls._instance is None:
|
|
cls._instance = super().__new__(cls)
|
|
return cls._instance
|
|
|
|
def __init__(self):
|
|
# Only initialize once
|
|
if self._initialized:
|
|
return
|
|
|
|
self._process: Optional[subprocess.Popen] = None
|
|
self._port = 5001
|
|
self._host = self._get_host_for_containers() # Get appropriate host IP based on runtime
|
|
self._running = False
|
|
self._external_process = False
|
|
|
|
# PID file to track docling-serve across sessions (in current working directory)
|
|
from pathlib import Path
|
|
self._pid_file = Path.cwd() / ".docling.pid"
|
|
|
|
# Log storage - simplified, no queue
|
|
self._log_buffer: List[str] = []
|
|
self._max_log_lines = 1000
|
|
self._log_lock = threading.Lock() # Thread-safe access to log buffer
|
|
|
|
self._initialized = True
|
|
|
|
# Try to recover existing process from PID file
|
|
self._recover_from_pid_file()
|
|
|
|
def _get_host_for_containers(self) -> str:
|
|
"""
|
|
Return a host IP that containers can reach (a bridge/CNI gateway).
|
|
Prefers Docker/Podman network gateways; falls back to bridge interfaces.
|
|
"""
|
|
import subprocess, json, shutil, re, logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def run(cmd, timeout=2, text=True):
|
|
return subprocess.run(cmd, capture_output=True, text=text, timeout=timeout)
|
|
|
|
gateways = []
|
|
compose_gateways = [] # Highest priority - compose project networks
|
|
active_gateways = [] # Medium priority - networks with containers
|
|
|
|
# ---- Docker: enumerate networks and collect gateways
|
|
if shutil.which("docker"):
|
|
try:
|
|
ls = run(["docker", "network", "ls", "--format", "{{.Name}}"])
|
|
if ls.returncode == 0:
|
|
for name in filter(None, ls.stdout.splitlines()):
|
|
try:
|
|
insp = run(["docker", "network", "inspect", name, "--format", "{{json .}}"])
|
|
if insp.returncode == 0 and insp.stdout.strip():
|
|
nw = json.loads(insp.stdout)[0] if insp.stdout.strip().startswith("[") else json.loads(insp.stdout)
|
|
ipam = nw.get("IPAM", {})
|
|
containers = nw.get("Containers", {})
|
|
for cfg in ipam.get("Config", []) or []:
|
|
gw = cfg.get("Gateway")
|
|
if gw:
|
|
# Highest priority: compose networks (ending in _default)
|
|
if name.endswith("_default"):
|
|
compose_gateways.append(gw)
|
|
# Medium priority: networks with active containers
|
|
elif len(containers) > 0:
|
|
active_gateways.append(gw)
|
|
# Low priority: empty networks
|
|
else:
|
|
gateways.append(gw)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
# ---- Podman: enumerate networks and collect gateways (netavark)
|
|
if shutil.which("podman"):
|
|
try:
|
|
# modern podman supports JSON format
|
|
ls = run(["podman", "network", "ls", "--format", "json"])
|
|
if ls.returncode == 0 and ls.stdout.strip():
|
|
for net in json.loads(ls.stdout):
|
|
name = net.get("name") or net.get("Name")
|
|
if not name:
|
|
continue
|
|
try:
|
|
insp = run(["podman", "network", "inspect", name, "--format", "json"])
|
|
if insp.returncode == 0 and insp.stdout.strip():
|
|
arr = json.loads(insp.stdout)
|
|
for item in (arr if isinstance(arr, list) else [arr]):
|
|
for sn in item.get("subnets", []) or []:
|
|
gw = sn.get("gateway")
|
|
if gw:
|
|
# Prioritize compose/project networks
|
|
if name.endswith("_default") or "_" in name:
|
|
compose_gateways.append(gw)
|
|
else:
|
|
gateways.append(gw)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
# ---- Fallback: parse host interfaces for common bridges
|
|
if not gateways:
|
|
try:
|
|
if shutil.which("ip"):
|
|
show = run(["ip", "-o", "-4", "addr", "show"])
|
|
if show.returncode == 0:
|
|
for line in show.stdout.splitlines():
|
|
# e.g. "12: br-3f0f... inet 172.18.0.1/16 ..."
|
|
m = re.search(r"^\d+:\s+([a-zA-Z0-9_.:-]+)\s+.*\binet\s+(\d+\.\d+\.\d+\.\d+)/", line)
|
|
if not m:
|
|
continue
|
|
ifname, ip = m.group(1), m.group(2)
|
|
if ifname == "docker0" or ifname.startswith(("br-", "cni")):
|
|
gateways.append(ip)
|
|
else:
|
|
# As a last resort, try net-tools ifconfig output
|
|
if shutil.which("ifconfig"):
|
|
show = run(["ifconfig"])
|
|
for block in show.stdout.split("\n\n"):
|
|
if any(block.strip().startswith(n) for n in ("docker0", "cni", "br-")):
|
|
m = re.search(r"inet (?:addr:)?(\d+\.\d+\.\d+\.\d+)", block)
|
|
if m:
|
|
gateways.append(m.group(1))
|
|
except Exception:
|
|
pass
|
|
|
|
# Dedup, prioritizing: 1) compose networks, 2) active networks, 3) all others
|
|
seen, uniq = set(), []
|
|
# First: compose project networks (_default suffix)
|
|
for ip in compose_gateways:
|
|
if ip not in seen:
|
|
uniq.append(ip)
|
|
seen.add(ip)
|
|
# Second: networks with active containers
|
|
for ip in active_gateways:
|
|
if ip not in seen:
|
|
uniq.append(ip)
|
|
seen.add(ip)
|
|
# Third: all other gateways
|
|
for ip in gateways:
|
|
if ip not in seen:
|
|
uniq.append(ip)
|
|
seen.add(ip)
|
|
|
|
if uniq:
|
|
if len(uniq) > 1:
|
|
logger.info("Container-reachable host IP candidates: %s", ", ".join(uniq))
|
|
else:
|
|
logger.info("Container-reachable host IP: %s", uniq[0])
|
|
return uniq[0]
|
|
|
|
# Nothing found: warn clearly
|
|
logger.warning(
|
|
"No container bridge IP found. If using rootless Podman (slirp4netns), there is no host bridge; publish ports or use 10.0.2.2 from the container."
|
|
)
|
|
# Returning localhost is honest only for same-namespace; keep it explicit:
|
|
return "127.0.0.1"
|
|
|
|
def cleanup(self):
|
|
"""Cleanup resources but keep docling-serve running across sessions."""
|
|
# Don't stop the process on exit - let it persist
|
|
# Just clean up our references
|
|
self._add_log_entry("TUI exiting - docling-serve will continue running")
|
|
# Note: We keep the PID file so we can reconnect in future sessions
|
|
|
|
def _save_pid(self, pid: int) -> None:
|
|
"""Save the process PID to a file for persistence across sessions."""
|
|
try:
|
|
self._pid_file.write_text(str(pid))
|
|
self._add_log_entry(f"Saved PID {pid} to {self._pid_file}")
|
|
except Exception as e:
|
|
self._add_log_entry(f"Failed to save PID file: {e}")
|
|
|
|
def _load_pid(self) -> Optional[int]:
|
|
"""Load the process PID from file."""
|
|
try:
|
|
if self._pid_file.exists():
|
|
pid_str = self._pid_file.read_text().strip()
|
|
if pid_str.isdigit():
|
|
return int(pid_str)
|
|
except Exception as e:
|
|
self._add_log_entry(f"Failed to load PID file: {e}")
|
|
return None
|
|
|
|
def _clear_pid_file(self) -> None:
|
|
"""Remove the PID file."""
|
|
try:
|
|
if self._pid_file.exists():
|
|
self._pid_file.unlink()
|
|
self._add_log_entry("Cleared PID file")
|
|
except Exception as e:
|
|
self._add_log_entry(f"Failed to clear PID file: {e}")
|
|
|
|
def _is_process_running(self, pid: int) -> bool:
|
|
"""Check if a process with the given PID is running."""
|
|
try:
|
|
# Send signal 0 to check if process exists (doesn't actually send a signal)
|
|
os.kill(pid, 0)
|
|
return True
|
|
except OSError:
|
|
return False
|
|
|
|
def _recover_from_pid_file(self) -> None:
|
|
"""Try to recover connection to existing docling-serve process from PID file."""
|
|
pid = self._load_pid()
|
|
if pid is not None:
|
|
if self._is_process_running(pid):
|
|
self._add_log_entry(f"Recovered existing docling-serve process (PID: {pid})")
|
|
# Mark as external process since we didn't start it in this session
|
|
self._external_process = True
|
|
self._running = True
|
|
# Note: We don't have a Popen object for this process, but that's OK
|
|
# We'll detect it's running via port check
|
|
else:
|
|
self._add_log_entry(f"Stale PID file found (PID: {pid} not running)")
|
|
self._clear_pid_file()
|
|
|
|
def _add_log_entry(self, message: str) -> None:
|
|
"""Add a log entry to the buffer (thread-safe)."""
|
|
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
|
|
entry = f"[{timestamp}] {message}"
|
|
|
|
with self._log_lock:
|
|
self._log_buffer.append(entry)
|
|
# Keep buffer size limited
|
|
if len(self._log_buffer) > self._max_log_lines:
|
|
self._log_buffer = self._log_buffer[-self._max_log_lines:]
|
|
|
|
def is_running(self) -> bool:
|
|
"""Check if docling serve is running (by PID only)."""
|
|
# Check if we have a direct process handle
|
|
if self._process is not None and self._process.poll() is None:
|
|
self._running = True
|
|
self._external_process = False
|
|
return True
|
|
|
|
# Check if we have a PID from file
|
|
pid = self._load_pid()
|
|
if pid is not None and self._is_process_running(pid):
|
|
self._running = True
|
|
self._external_process = True
|
|
return True
|
|
|
|
# No running process found
|
|
self._running = False
|
|
self._external_process = False
|
|
return False
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Get current status of docling serve."""
|
|
if self.is_running():
|
|
# Try to get PID from process handle first, then from PID file
|
|
pid = None
|
|
if self._process:
|
|
pid = self._process.pid
|
|
else:
|
|
pid = self._load_pid()
|
|
|
|
return {
|
|
"status": "running",
|
|
"port": self._port,
|
|
"host": self._host,
|
|
"endpoint": f"http://{self._host}:{self._port}",
|
|
"docs_url": f"http://{self._host}:{self._port}/docs",
|
|
"ui_url": f"http://{self._host}:{self._port}/ui",
|
|
"pid": pid
|
|
}
|
|
else:
|
|
return {
|
|
"status": "stopped",
|
|
"port": self._port,
|
|
"host": self._host,
|
|
"endpoint": None,
|
|
"docs_url": None,
|
|
"ui_url": None,
|
|
"pid": None
|
|
}
|
|
|
|
async def start(self, port: int = 5001, host: Optional[str] = None, enable_ui: bool = False) -> Tuple[bool, str]:
|
|
"""Start docling serve as external process."""
|
|
if self.is_running():
|
|
return False, "Docling serve is already running"
|
|
|
|
self._port = port
|
|
# Use provided host or the bridge IP we detected in __init__
|
|
if host is not None:
|
|
self._host = host
|
|
# else: keep self._host as already set in __init__
|
|
|
|
# Check if port is already in use before trying to start
|
|
import socket
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.settimeout(0.5)
|
|
result = s.connect_ex((self._host, self._port))
|
|
s.close()
|
|
if result == 0:
|
|
return False, f"Port {self._port} on {self._host} is already in use by another process. Please stop it first."
|
|
except Exception as e:
|
|
self._add_log_entry(f"Error checking port availability: {e}")
|
|
|
|
# Clear log buffer when starting
|
|
self._log_buffer = []
|
|
self._add_log_entry("Starting docling serve as external process...")
|
|
|
|
try:
|
|
# Build command to run docling-serve
|
|
# Check if we should use uv run (look for uv in environment or check if we're in a uv project)
|
|
import shutil
|
|
if shutil.which("uv") and (os.path.exists("pyproject.toml") or os.getenv("VIRTUAL_ENV")):
|
|
cmd = [
|
|
"uv", "run", "python", "-m", "docling_serve", "run",
|
|
"--host", self._host,
|
|
"--port", str(self._port),
|
|
]
|
|
else:
|
|
cmd = [
|
|
sys.executable, "-m", "docling_serve", "run",
|
|
"--host", self._host,
|
|
"--port", str(self._port),
|
|
]
|
|
|
|
if enable_ui:
|
|
cmd.append("--enable-ui")
|
|
|
|
self._add_log_entry(f"Starting process: {' '.join(cmd)}")
|
|
|
|
# Start as subprocess
|
|
self._process = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
universal_newlines=True,
|
|
bufsize=0 # Unbuffered for real-time output
|
|
)
|
|
|
|
self._running = True
|
|
self._add_log_entry("External process started")
|
|
|
|
# Save the PID to file for persistence
|
|
self._save_pid(self._process.pid)
|
|
|
|
# Start a thread to capture output
|
|
self._start_output_capture()
|
|
|
|
# Wait for the process to start and begin listening
|
|
self._add_log_entry("Waiting for docling-serve to start listening...")
|
|
|
|
# Wait up to 10 seconds for the service to start listening
|
|
for i in range(10):
|
|
await asyncio.sleep(1.0)
|
|
|
|
# Check if process is still alive
|
|
if self._process.poll() is not None:
|
|
break
|
|
|
|
# Check if it's listening on the port
|
|
try:
|
|
import socket
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.settimeout(0.5)
|
|
result = s.connect_ex((self._host, self._port))
|
|
s.close()
|
|
|
|
if result == 0:
|
|
self._add_log_entry(f"Docling-serve is now listening on {self._host}:{self._port}")
|
|
break
|
|
except:
|
|
pass
|
|
|
|
self._add_log_entry(f"Waiting for startup... ({i+1}/10)")
|
|
|
|
# Add a test message to verify logging is working
|
|
self._add_log_entry(f"Process PID: {self._process.pid}, Poll: {self._process.poll()}")
|
|
|
|
if self._process.poll() is not None:
|
|
# Process already exited - get return code and any output
|
|
return_code = self._process.returncode
|
|
self._add_log_entry(f"Process exited with code: {return_code}")
|
|
|
|
try:
|
|
# Try to read any remaining output
|
|
stdout_data = ""
|
|
stderr_data = ""
|
|
|
|
if self._process.stdout:
|
|
stdout_data = self._process.stdout.read()
|
|
if self._process.stderr:
|
|
stderr_data = self._process.stderr.read()
|
|
|
|
if stdout_data:
|
|
self._add_log_entry(f"Final stdout: {stdout_data[:500]}")
|
|
if stderr_data:
|
|
self._add_log_entry(f"Final stderr: {stderr_data[:500]}")
|
|
|
|
except Exception as e:
|
|
self._add_log_entry(f"Error reading final output: {e}")
|
|
|
|
self._running = False
|
|
return False, f"Docling serve process exited immediately (code: {return_code})"
|
|
|
|
return True, f"Docling serve starting on http://{host}:{port}"
|
|
|
|
except FileNotFoundError:
|
|
return False, "docling-serve not available. Please install: uv add docling-serve"
|
|
except Exception as e:
|
|
self._running = False
|
|
self._process = None
|
|
return False, f"Error starting docling serve: {str(e)}"
|
|
|
|
def _start_output_capture(self):
|
|
"""Start threads to capture subprocess stdout and stderr."""
|
|
def capture_stdout():
|
|
if not self._process or not self._process.stdout:
|
|
self._add_log_entry("No stdout pipe available")
|
|
return
|
|
|
|
self._add_log_entry("Starting stdout capture thread")
|
|
try:
|
|
while self._running and self._process and self._process.poll() is None:
|
|
line = self._process.stdout.readline()
|
|
if line:
|
|
self._add_log_entry(f"STDOUT: {line.rstrip()}")
|
|
else:
|
|
# No more output, wait a bit
|
|
time.sleep(0.1)
|
|
except Exception as e:
|
|
self._add_log_entry(f"Error capturing stdout: {e}")
|
|
finally:
|
|
self._add_log_entry("Stdout capture thread ended")
|
|
|
|
def capture_stderr():
|
|
if not self._process or not self._process.stderr:
|
|
self._add_log_entry("No stderr pipe available")
|
|
return
|
|
|
|
self._add_log_entry("Starting stderr capture thread")
|
|
try:
|
|
while self._running and self._process and self._process.poll() is None:
|
|
line = self._process.stderr.readline()
|
|
if line:
|
|
self._add_log_entry(f"STDERR: {line.rstrip()}")
|
|
else:
|
|
# No more output, wait a bit
|
|
time.sleep(0.1)
|
|
except Exception as e:
|
|
self._add_log_entry(f"Error capturing stderr: {e}")
|
|
finally:
|
|
self._add_log_entry("Stderr capture thread ended")
|
|
|
|
# Start both capture threads
|
|
stdout_thread = threading.Thread(target=capture_stdout, daemon=True)
|
|
stderr_thread = threading.Thread(target=capture_stderr, daemon=True)
|
|
|
|
stdout_thread.start()
|
|
stderr_thread.start()
|
|
|
|
self._add_log_entry("Output capture threads started")
|
|
|
|
async def stop(self) -> Tuple[bool, str]:
|
|
"""Stop docling serve."""
|
|
if not self.is_running():
|
|
return False, "Docling serve is not running"
|
|
|
|
try:
|
|
self._add_log_entry("Stopping docling-serve process")
|
|
|
|
pid_to_stop = None
|
|
|
|
if self._process:
|
|
# We have a direct process handle
|
|
pid_to_stop = self._process.pid
|
|
self._add_log_entry(f"Terminating our process (PID: {pid_to_stop})")
|
|
self._process.terminate()
|
|
|
|
# Wait for it to stop
|
|
try:
|
|
self._process.wait(timeout=10)
|
|
self._add_log_entry("Process terminated gracefully")
|
|
except subprocess.TimeoutExpired:
|
|
# Force kill if it doesn't stop gracefully
|
|
self._add_log_entry("Process didn't stop gracefully, force killing")
|
|
self._process.kill()
|
|
self._process.wait()
|
|
self._add_log_entry("Process force killed")
|
|
|
|
elif self._external_process:
|
|
# This is a process we recovered from PID file
|
|
pid_to_stop = self._load_pid()
|
|
if pid_to_stop and self._is_process_running(pid_to_stop):
|
|
self._add_log_entry(f"Stopping process from PID file (PID: {pid_to_stop})")
|
|
try:
|
|
os.kill(pid_to_stop, 15) # SIGTERM
|
|
# Wait a bit for graceful shutdown
|
|
await asyncio.sleep(2)
|
|
if self._is_process_running(pid_to_stop):
|
|
# Still running, force kill
|
|
self._add_log_entry(f"Force killing process (PID: {pid_to_stop})")
|
|
os.kill(pid_to_stop, 9) # SIGKILL
|
|
except Exception as e:
|
|
self._add_log_entry(f"Error stopping external process: {e}")
|
|
return False, f"Error stopping external process: {str(e)}"
|
|
else:
|
|
self._add_log_entry("External process not found")
|
|
return False, "Process not found"
|
|
|
|
self._running = False
|
|
self._process = None
|
|
self._external_process = False
|
|
|
|
# Clear the PID file since we intentionally stopped the service
|
|
self._clear_pid_file()
|
|
|
|
self._add_log_entry("Docling serve stopped successfully")
|
|
return True, "Docling serve stopped successfully"
|
|
|
|
except Exception as e:
|
|
self._add_log_entry(f"Error stopping docling serve: {e}")
|
|
return False, f"Error stopping docling serve: {str(e)}"
|
|
|
|
async def restart(self, port: Optional[int] = None, host: Optional[str] = None, enable_ui: bool = False) -> Tuple[bool, str]:
|
|
"""Restart docling serve."""
|
|
# Use current settings if not specified
|
|
if port is None:
|
|
port = self._port
|
|
if host is None:
|
|
host = self._host
|
|
|
|
# Stop if running
|
|
if self.is_running():
|
|
success, msg = await self.stop()
|
|
if not success:
|
|
return False, f"Failed to stop: {msg}"
|
|
|
|
# Wait a moment for cleanup
|
|
await asyncio.sleep(1)
|
|
|
|
# Start with new settings
|
|
return await self.start(port, host, enable_ui)
|
|
|
|
def add_manual_log_entry(self, message: str) -> None:
|
|
"""Add a manual log entry - useful for debugging."""
|
|
self._add_log_entry(f"MANUAL: {message}")
|
|
|
|
def get_logs(self, lines: int = 50) -> Tuple[bool, str]:
|
|
"""Get logs from the docling-serve process."""
|
|
if self.is_running():
|
|
with self._log_lock:
|
|
# If we have no logs but the service is running, it might have been started externally
|
|
if not self._log_buffer:
|
|
return True, "No logs available yet..."
|
|
|
|
# Return the most recent logs
|
|
log_count = min(lines, len(self._log_buffer))
|
|
logs = "\n".join(self._log_buffer[-log_count:])
|
|
return True, logs
|
|
else:
|
|
return True, "Docling serve is not running."
|
|
|
|
async def follow_logs(self) -> AsyncIterator[str]:
|
|
"""Follow logs from the docling-serve process in real-time."""
|
|
# First yield status message and any existing logs
|
|
status_msg = f"Docling serve is running on http://{self._host}:{self._port}"
|
|
|
|
with self._log_lock:
|
|
if self._log_buffer:
|
|
yield "\n".join(self._log_buffer)
|
|
last_log_index = len(self._log_buffer)
|
|
else:
|
|
yield "Waiting for logs..."
|
|
last_log_index = 0
|
|
|
|
# Then start monitoring for new logs
|
|
while self.is_running():
|
|
with self._log_lock:
|
|
# Check if we have new logs
|
|
if len(self._log_buffer) > last_log_index:
|
|
# Yield only the new logs
|
|
new_logs = self._log_buffer[last_log_index:]
|
|
yield "\n".join(new_logs)
|
|
last_log_index = len(self._log_buffer)
|
|
|
|
# Wait a bit before checking again
|
|
await asyncio.sleep(0.1)
|
|
|
|
# Final check for any logs that came in during shutdown
|
|
with self._log_lock:
|
|
if len(self._log_buffer) > last_log_index:
|
|
yield "\n".join(self._log_buffer[last_log_index:])
|