worker logging and reliability

This commit is contained in:
phact 2025-08-22 12:40:55 -04:00
parent 629a98351b
commit 6011b7fb2f
4 changed files with 190 additions and 43 deletions

View file

@ -21,6 +21,7 @@ dependencies = [
"torch>=2.7.1",
"uvicorn>=0.35.0",
"boto3>=1.35.0",
"psutil>=7.0.0",
]
[tool.uv.sources]

View file

@ -81,6 +81,27 @@ class DocumentService:
self.process_pool = process_pool
self.session_manager = session_manager
self._mapping_ensured = False
self._process_pool_broken = False
def _recreate_process_pool(self):
"""Recreate the process pool if it's broken"""
if self._process_pool_broken and self.process_pool:
print("[WARNING] Attempting to recreate broken process pool...")
try:
# Shutdown the old pool
self.process_pool.shutdown(wait=False)
# Import and create a new pool
from utils.process_pool import MAX_WORKERS
from concurrent.futures import ProcessPoolExecutor
self.process_pool = ProcessPoolExecutor(max_workers=MAX_WORKERS)
self._process_pool_broken = False
print(f"[INFO] Process pool recreated with {MAX_WORKERS} workers")
return True
except Exception as e:
print(f"[ERROR] Failed to recreate process pool: {e}")
return False
return False
async def process_file_common(self, file_path: str, file_hash: str = None, owner_user_id: str = None, original_filename: str = None, jwt_token: str = None, owner_name: str = None, owner_email: str = None, file_size: int = None, connector_type: str = "local"):
"""
@ -290,11 +311,31 @@ class DocumentService:
upload_task.successful_files += 1
except Exception as e:
print(f"[ERROR] Failed to process file {file_path}: {e}")
import traceback
from concurrent.futures import BrokenExecutor
if isinstance(e, BrokenExecutor):
print(f"[CRITICAL] Process pool broken while processing {file_path}")
print(f"[INFO] This usually indicates a worker process crashed")
print(f"[INFO] You should see detailed crash logs above from the worker process")
# Mark pool as broken for potential recreation
self._process_pool_broken = True
# Attempt to recreate the pool for future operations
if self._recreate_process_pool():
print(f"[INFO] Process pool successfully recreated")
else:
print(f"[WARNING] Failed to recreate process pool - future operations may fail")
file_task.error = f"Worker process crashed: {str(e)}"
else:
print(f"[ERROR] Failed to process file {file_path}: {e}")
file_task.error = str(e)
print(f"[ERROR] Full traceback:")
traceback.print_exc()
file_task.status = TaskStatus.FAILED
file_task.error = str(e)
upload_task.failed_files += 1
finally:
file_task.updated_at = time.time()

View file

@ -33,9 +33,9 @@ def get_worker_converter():
os.environ.pop('USE_CPU_ONLY', None)
os.environ['HF_HUB_DISABLE_PROGRESS_BARS'] = '1' # Still disable progress bars
print(f"🔧 Initializing DocumentConverter in worker process (PID: {os.getpid()})")
print(f"[WORKER {os.getpid()}] Initializing DocumentConverter in worker process")
_worker_converter = DocumentConverter()
print(f"✅ DocumentConverter ready in worker process (PID: {os.getpid()})")
print(f"[WORKER {os.getpid()}] DocumentConverter ready in worker process")
return _worker_converter
@ -101,48 +101,136 @@ def extract_relevant(doc_dict: dict) -> dict:
def process_document_sync(file_path: str):
"""Synchronous document processing function for multiprocessing"""
import traceback
import psutil
import sys
from collections import defaultdict
# Get the cached converter for this worker
converter = get_worker_converter()
process = psutil.Process()
start_memory = process.memory_info().rss / 1024 / 1024 # MB
# Compute file hash
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
while True:
chunk = f.read(1 << 20)
if not chunk:
break
sha256.update(chunk)
file_hash = sha256.hexdigest()
# Convert with docling
result = converter.convert(file_path)
full_doc = result.document.export_to_dict()
# Extract relevant content (same logic as extract_relevant)
origin = full_doc.get("origin", {})
texts = full_doc.get("texts", [])
try:
print(f"[WORKER {os.getpid()}] Starting document processing: {file_path}")
print(f"[WORKER {os.getpid()}] Initial memory usage: {start_memory:.1f} MB")
# Check file size
try:
file_size = os.path.getsize(file_path) / 1024 / 1024 # MB
print(f"[WORKER {os.getpid()}] File size: {file_size:.1f} MB")
except OSError as e:
print(f"[WORKER {os.getpid()}] WARNING: Cannot get file size: {e}")
file_size = 0
# Get the cached converter for this worker
try:
print(f"[WORKER {os.getpid()}] Getting document converter...")
converter = get_worker_converter()
memory_after_converter = process.memory_info().rss / 1024 / 1024
print(f"[WORKER {os.getpid()}] Memory after converter init: {memory_after_converter:.1f} MB")
except Exception as e:
print(f"[WORKER {os.getpid()}] ERROR: Failed to initialize converter: {e}")
traceback.print_exc()
raise
# Compute file hash
try:
print(f"[WORKER {os.getpid()}] Computing file hash...")
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
while True:
chunk = f.read(1 << 20)
if not chunk:
break
sha256.update(chunk)
file_hash = sha256.hexdigest()
print(f"[WORKER {os.getpid()}] File hash computed: {file_hash[:12]}...")
except Exception as e:
print(f"[WORKER {os.getpid()}] ERROR: Failed to compute file hash: {e}")
traceback.print_exc()
raise
# Convert with docling
try:
print(f"[WORKER {os.getpid()}] Starting docling conversion...")
memory_before_convert = process.memory_info().rss / 1024 / 1024
print(f"[WORKER {os.getpid()}] Memory before conversion: {memory_before_convert:.1f} MB")
result = converter.convert(file_path)
memory_after_convert = process.memory_info().rss / 1024 / 1024
print(f"[WORKER {os.getpid()}] Memory after conversion: {memory_after_convert:.1f} MB")
print(f"[WORKER {os.getpid()}] Docling conversion completed")
full_doc = result.document.export_to_dict()
memory_after_export = process.memory_info().rss / 1024 / 1024
print(f"[WORKER {os.getpid()}] Memory after export: {memory_after_export:.1f} MB")
except Exception as e:
print(f"[WORKER {os.getpid()}] ERROR: Failed during docling conversion: {e}")
print(f"[WORKER {os.getpid()}] Current memory usage: {process.memory_info().rss / 1024 / 1024:.1f} MB")
traceback.print_exc()
raise
# Extract relevant content (same logic as extract_relevant)
try:
print(f"[WORKER {os.getpid()}] Extracting relevant content...")
origin = full_doc.get("origin", {})
texts = full_doc.get("texts", [])
print(f"[WORKER {os.getpid()}] Found {len(texts)} text fragments")
page_texts = defaultdict(list)
for txt in texts:
prov = txt.get("prov", [])
page_no = prov[0].get("page_no") if prov else None
if page_no is not None:
page_texts[page_no].append(txt.get("text", "").strip())
page_texts = defaultdict(list)
for txt in texts:
prov = txt.get("prov", [])
page_no = prov[0].get("page_no") if prov else None
if page_no is not None:
page_texts[page_no].append(txt.get("text", "").strip())
chunks = []
for page in sorted(page_texts):
joined = "\n".join(page_texts[page])
chunks.append({
"page": page,
"text": joined
})
chunks = []
for page in sorted(page_texts):
joined = "\n".join(page_texts[page])
chunks.append({
"page": page,
"text": joined
})
print(f"[WORKER {os.getpid()}] Created {len(chunks)} chunks from {len(page_texts)} pages")
except Exception as e:
print(f"[WORKER {os.getpid()}] ERROR: Failed during content extraction: {e}")
traceback.print_exc()
raise
return {
"id": file_hash,
"filename": origin.get("filename"),
"mimetype": origin.get("mimetype"),
"chunks": chunks,
"file_path": file_path
}
final_memory = process.memory_info().rss / 1024 / 1024
memory_delta = final_memory - start_memory
print(f"[WORKER {os.getpid()}] Document processing completed successfully")
print(f"[WORKER {os.getpid()}] Final memory: {final_memory:.1f} MB (Delta +{memory_delta:.1f} MB)")
return {
"id": file_hash,
"filename": origin.get("filename"),
"mimetype": origin.get("mimetype"),
"chunks": chunks,
"file_path": file_path
}
except Exception as e:
final_memory = process.memory_info().rss / 1024 / 1024
memory_delta = final_memory - start_memory
print(f"[WORKER {os.getpid()}] FATAL ERROR in process_document_sync")
print(f"[WORKER {os.getpid()}] File: {file_path}")
print(f"[WORKER {os.getpid()}] Python version: {sys.version}")
print(f"[WORKER {os.getpid()}] Memory at crash: {final_memory:.1f} MB (Delta +{memory_delta:.1f} MB)")
print(f"[WORKER {os.getpid()}] Error: {type(e).__name__}: {e}")
print(f"[WORKER {os.getpid()}] Full traceback:")
traceback.print_exc()
# Try to get more system info before crashing
try:
import platform
print(f"[WORKER {os.getpid()}] System: {platform.system()} {platform.release()}")
print(f"[WORKER {os.getpid()}] Architecture: {platform.machine()}")
except:
pass
# Re-raise to trigger BrokenProcessPool in main process
raise

17
uv.lock generated
View file

@ -1382,6 +1382,7 @@ dependencies = [
{ name = "httpx" },
{ name = "msal" },
{ name = "opensearch-py", extra = ["async"] },
{ name = "psutil" },
{ name = "pyjwt" },
{ name = "python-multipart" },
{ name = "starlette" },
@ -1402,6 +1403,7 @@ requires-dist = [
{ name = "httpx", specifier = ">=0.27.0" },
{ name = "msal", specifier = ">=1.29.0" },
{ name = "opensearch-py", extras = ["async"], specifier = ">=3.0.0" },
{ name = "psutil", specifier = ">=7.0.0" },
{ name = "pyjwt", specifier = ">=2.8.0" },
{ name = "python-multipart", specifier = ">=0.0.20" },
{ name = "starlette", specifier = ">=0.47.1" },
@ -1597,6 +1599,21 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f7/af/ab3c51ab7507a7325e98ffe691d9495ee3d3aa5f589afad65ec920d39821/protobuf-6.31.1-py3-none-any.whl", hash = "sha256:720a6c7e6b77288b85063569baae8536671b39f15cc22037ec7045658d80489e", size = 168724, upload-time = "2025-05-28T19:25:53.926Z" },
]
[[package]]
name = "psutil"
version = "7.0.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/2a/80/336820c1ad9286a4ded7e845b2eccfcb27851ab8ac6abece774a6ff4d3de/psutil-7.0.0.tar.gz", hash = "sha256:7be9c3eba38beccb6495ea33afd982a44074b78f28c434a1f51cc07fd315c456", size = 497003, upload-time = "2025-02-13T21:54:07.946Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ed/e6/2d26234410f8b8abdbf891c9da62bee396583f713fb9f3325a4760875d22/psutil-7.0.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:101d71dc322e3cffd7cea0650b09b3d08b8e7c4109dd6809fe452dfd00e58b25", size = 238051, upload-time = "2025-02-13T21:54:12.36Z" },
{ url = "https://files.pythonhosted.org/packages/04/8b/30f930733afe425e3cbfc0e1468a30a18942350c1a8816acfade80c005c4/psutil-7.0.0-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:39db632f6bb862eeccf56660871433e111b6ea58f2caea825571951d4b6aa3da", size = 239535, upload-time = "2025-02-13T21:54:16.07Z" },
{ url = "https://files.pythonhosted.org/packages/2a/ed/d362e84620dd22876b55389248e522338ed1bf134a5edd3b8231d7207f6d/psutil-7.0.0-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1fcee592b4c6f146991ca55919ea3d1f8926497a713ed7faaf8225e174581e91", size = 275004, upload-time = "2025-02-13T21:54:18.662Z" },
{ url = "https://files.pythonhosted.org/packages/bf/b9/b0eb3f3cbcb734d930fdf839431606844a825b23eaf9a6ab371edac8162c/psutil-7.0.0-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4b1388a4f6875d7e2aff5c4ca1cc16c545ed41dd8bb596cefea80111db353a34", size = 277986, upload-time = "2025-02-13T21:54:21.811Z" },
{ url = "https://files.pythonhosted.org/packages/eb/a2/709e0fe2f093556c17fbafda93ac032257242cabcc7ff3369e2cb76a97aa/psutil-7.0.0-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a5f098451abc2828f7dc6b58d44b532b22f2088f4999a937557b603ce72b1993", size = 279544, upload-time = "2025-02-13T21:54:24.68Z" },
{ url = "https://files.pythonhosted.org/packages/50/e6/eecf58810b9d12e6427369784efe814a1eec0f492084ce8eb8f4d89d6d61/psutil-7.0.0-cp37-abi3-win32.whl", hash = "sha256:ba3fcef7523064a6c9da440fc4d6bd07da93ac726b5733c29027d7dc95b39d99", size = 241053, upload-time = "2025-02-13T21:54:34.31Z" },
{ url = "https://files.pythonhosted.org/packages/50/1b/6921afe68c74868b4c9fa424dad3be35b095e16687989ebbb50ce4fceb7c/psutil-7.0.0-cp37-abi3-win_amd64.whl", hash = "sha256:4cf3d4eb1aa9b348dec30105c55cd9b7d4629285735a102beb4441e38db90553", size = 244885, upload-time = "2025-02-13T21:54:37.486Z" },
]
[[package]]
name = "pyasn1"
version = "0.6.1"