From 6011b7fb2fb7a73942b7c51dc99d2f594d91ec99 Mon Sep 17 00:00:00 2001 From: phact Date: Fri, 22 Aug 2025 12:40:55 -0400 Subject: [PATCH] worker logging and reliability --- pyproject.toml | 1 + src/services/document_service.py | 45 +++++++- src/utils/document_processing.py | 170 +++++++++++++++++++++++-------- uv.lock | 17 ++++ 4 files changed, 190 insertions(+), 43 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index a8511901..89b70864 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ dependencies = [ "torch>=2.7.1", "uvicorn>=0.35.0", "boto3>=1.35.0", + "psutil>=7.0.0", ] [tool.uv.sources] diff --git a/src/services/document_service.py b/src/services/document_service.py index d5937aa8..575201db 100644 --- a/src/services/document_service.py +++ b/src/services/document_service.py @@ -81,6 +81,27 @@ class DocumentService: self.process_pool = process_pool self.session_manager = session_manager self._mapping_ensured = False + self._process_pool_broken = False + + def _recreate_process_pool(self): + """Recreate the process pool if it's broken""" + if self._process_pool_broken and self.process_pool: + print("[WARNING] Attempting to recreate broken process pool...") + try: + # Shutdown the old pool + self.process_pool.shutdown(wait=False) + + # Import and create a new pool + from utils.process_pool import MAX_WORKERS + from concurrent.futures import ProcessPoolExecutor + self.process_pool = ProcessPoolExecutor(max_workers=MAX_WORKERS) + self._process_pool_broken = False + print(f"[INFO] Process pool recreated with {MAX_WORKERS} workers") + return True + except Exception as e: + print(f"[ERROR] Failed to recreate process pool: {e}") + return False + return False async def process_file_common(self, file_path: str, file_hash: str = None, owner_user_id: str = None, original_filename: str = None, jwt_token: str = None, owner_name: str = None, owner_email: str = None, file_size: int = None, connector_type: str = "local"): """ @@ -290,11 +311,31 @@ class DocumentService: upload_task.successful_files += 1 except Exception as e: - print(f"[ERROR] Failed to process file {file_path}: {e}") import traceback + from concurrent.futures import BrokenExecutor + + if isinstance(e, BrokenExecutor): + print(f"[CRITICAL] Process pool broken while processing {file_path}") + print(f"[INFO] This usually indicates a worker process crashed") + print(f"[INFO] You should see detailed crash logs above from the worker process") + + # Mark pool as broken for potential recreation + self._process_pool_broken = True + + # Attempt to recreate the pool for future operations + if self._recreate_process_pool(): + print(f"[INFO] Process pool successfully recreated") + else: + print(f"[WARNING] Failed to recreate process pool - future operations may fail") + + file_task.error = f"Worker process crashed: {str(e)}" + else: + print(f"[ERROR] Failed to process file {file_path}: {e}") + file_task.error = str(e) + + print(f"[ERROR] Full traceback:") traceback.print_exc() file_task.status = TaskStatus.FAILED - file_task.error = str(e) upload_task.failed_files += 1 finally: file_task.updated_at = time.time() diff --git a/src/utils/document_processing.py b/src/utils/document_processing.py index 78d5a56e..a4ca66ce 100644 --- a/src/utils/document_processing.py +++ b/src/utils/document_processing.py @@ -33,9 +33,9 @@ def get_worker_converter(): os.environ.pop('USE_CPU_ONLY', None) os.environ['HF_HUB_DISABLE_PROGRESS_BARS'] = '1' # Still disable progress bars - print(f"🔧 Initializing DocumentConverter in worker process (PID: {os.getpid()})") + print(f"[WORKER {os.getpid()}] Initializing DocumentConverter in worker process") _worker_converter = DocumentConverter() - print(f"✅ DocumentConverter ready in worker process (PID: {os.getpid()})") + print(f"[WORKER {os.getpid()}] DocumentConverter ready in worker process") return _worker_converter @@ -101,48 +101,136 @@ def extract_relevant(doc_dict: dict) -> dict: def process_document_sync(file_path: str): """Synchronous document processing function for multiprocessing""" + import traceback + import psutil + import sys from collections import defaultdict - # Get the cached converter for this worker - converter = get_worker_converter() + process = psutil.Process() + start_memory = process.memory_info().rss / 1024 / 1024 # MB - # Compute file hash - sha256 = hashlib.sha256() - with open(file_path, "rb") as f: - while True: - chunk = f.read(1 << 20) - if not chunk: - break - sha256.update(chunk) - file_hash = sha256.hexdigest() - - # Convert with docling - result = converter.convert(file_path) - full_doc = result.document.export_to_dict() - - # Extract relevant content (same logic as extract_relevant) - origin = full_doc.get("origin", {}) - texts = full_doc.get("texts", []) + try: + print(f"[WORKER {os.getpid()}] Starting document processing: {file_path}") + print(f"[WORKER {os.getpid()}] Initial memory usage: {start_memory:.1f} MB") + + # Check file size + try: + file_size = os.path.getsize(file_path) / 1024 / 1024 # MB + print(f"[WORKER {os.getpid()}] File size: {file_size:.1f} MB") + except OSError as e: + print(f"[WORKER {os.getpid()}] WARNING: Cannot get file size: {e}") + file_size = 0 + + # Get the cached converter for this worker + try: + print(f"[WORKER {os.getpid()}] Getting document converter...") + converter = get_worker_converter() + memory_after_converter = process.memory_info().rss / 1024 / 1024 + print(f"[WORKER {os.getpid()}] Memory after converter init: {memory_after_converter:.1f} MB") + except Exception as e: + print(f"[WORKER {os.getpid()}] ERROR: Failed to initialize converter: {e}") + traceback.print_exc() + raise + + # Compute file hash + try: + print(f"[WORKER {os.getpid()}] Computing file hash...") + sha256 = hashlib.sha256() + with open(file_path, "rb") as f: + while True: + chunk = f.read(1 << 20) + if not chunk: + break + sha256.update(chunk) + file_hash = sha256.hexdigest() + print(f"[WORKER {os.getpid()}] File hash computed: {file_hash[:12]}...") + except Exception as e: + print(f"[WORKER {os.getpid()}] ERROR: Failed to compute file hash: {e}") + traceback.print_exc() + raise + + # Convert with docling + try: + print(f"[WORKER {os.getpid()}] Starting docling conversion...") + memory_before_convert = process.memory_info().rss / 1024 / 1024 + print(f"[WORKER {os.getpid()}] Memory before conversion: {memory_before_convert:.1f} MB") + + result = converter.convert(file_path) + + memory_after_convert = process.memory_info().rss / 1024 / 1024 + print(f"[WORKER {os.getpid()}] Memory after conversion: {memory_after_convert:.1f} MB") + print(f"[WORKER {os.getpid()}] Docling conversion completed") + + full_doc = result.document.export_to_dict() + memory_after_export = process.memory_info().rss / 1024 / 1024 + print(f"[WORKER {os.getpid()}] Memory after export: {memory_after_export:.1f} MB") + + except Exception as e: + print(f"[WORKER {os.getpid()}] ERROR: Failed during docling conversion: {e}") + print(f"[WORKER {os.getpid()}] Current memory usage: {process.memory_info().rss / 1024 / 1024:.1f} MB") + traceback.print_exc() + raise + + # Extract relevant content (same logic as extract_relevant) + try: + print(f"[WORKER {os.getpid()}] Extracting relevant content...") + origin = full_doc.get("origin", {}) + texts = full_doc.get("texts", []) + print(f"[WORKER {os.getpid()}] Found {len(texts)} text fragments") - page_texts = defaultdict(list) - for txt in texts: - prov = txt.get("prov", []) - page_no = prov[0].get("page_no") if prov else None - if page_no is not None: - page_texts[page_no].append(txt.get("text", "").strip()) + page_texts = defaultdict(list) + for txt in texts: + prov = txt.get("prov", []) + page_no = prov[0].get("page_no") if prov else None + if page_no is not None: + page_texts[page_no].append(txt.get("text", "").strip()) - chunks = [] - for page in sorted(page_texts): - joined = "\n".join(page_texts[page]) - chunks.append({ - "page": page, - "text": joined - }) + chunks = [] + for page in sorted(page_texts): + joined = "\n".join(page_texts[page]) + chunks.append({ + "page": page, + "text": joined + }) + + print(f"[WORKER {os.getpid()}] Created {len(chunks)} chunks from {len(page_texts)} pages") + + except Exception as e: + print(f"[WORKER {os.getpid()}] ERROR: Failed during content extraction: {e}") + traceback.print_exc() + raise - return { - "id": file_hash, - "filename": origin.get("filename"), - "mimetype": origin.get("mimetype"), - "chunks": chunks, - "file_path": file_path - } \ No newline at end of file + final_memory = process.memory_info().rss / 1024 / 1024 + memory_delta = final_memory - start_memory + print(f"[WORKER {os.getpid()}] Document processing completed successfully") + print(f"[WORKER {os.getpid()}] Final memory: {final_memory:.1f} MB (Delta +{memory_delta:.1f} MB)") + + return { + "id": file_hash, + "filename": origin.get("filename"), + "mimetype": origin.get("mimetype"), + "chunks": chunks, + "file_path": file_path + } + + except Exception as e: + final_memory = process.memory_info().rss / 1024 / 1024 + memory_delta = final_memory - start_memory + print(f"[WORKER {os.getpid()}] FATAL ERROR in process_document_sync") + print(f"[WORKER {os.getpid()}] File: {file_path}") + print(f"[WORKER {os.getpid()}] Python version: {sys.version}") + print(f"[WORKER {os.getpid()}] Memory at crash: {final_memory:.1f} MB (Delta +{memory_delta:.1f} MB)") + print(f"[WORKER {os.getpid()}] Error: {type(e).__name__}: {e}") + print(f"[WORKER {os.getpid()}] Full traceback:") + traceback.print_exc() + + # Try to get more system info before crashing + try: + import platform + print(f"[WORKER {os.getpid()}] System: {platform.system()} {platform.release()}") + print(f"[WORKER {os.getpid()}] Architecture: {platform.machine()}") + except: + pass + + # Re-raise to trigger BrokenProcessPool in main process + raise \ No newline at end of file diff --git a/uv.lock b/uv.lock index 85ba9b9b..bac2c8e2 100644 --- a/uv.lock +++ b/uv.lock @@ -1382,6 +1382,7 @@ dependencies = [ { name = "httpx" }, { name = "msal" }, { name = "opensearch-py", extra = ["async"] }, + { name = "psutil" }, { name = "pyjwt" }, { name = "python-multipart" }, { name = "starlette" }, @@ -1402,6 +1403,7 @@ requires-dist = [ { name = "httpx", specifier = ">=0.27.0" }, { name = "msal", specifier = ">=1.29.0" }, { name = "opensearch-py", extras = ["async"], specifier = ">=3.0.0" }, + { name = "psutil", specifier = ">=7.0.0" }, { name = "pyjwt", specifier = ">=2.8.0" }, { name = "python-multipart", specifier = ">=0.0.20" }, { name = "starlette", specifier = ">=0.47.1" }, @@ -1597,6 +1599,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f7/af/ab3c51ab7507a7325e98ffe691d9495ee3d3aa5f589afad65ec920d39821/protobuf-6.31.1-py3-none-any.whl", hash = "sha256:720a6c7e6b77288b85063569baae8536671b39f15cc22037ec7045658d80489e", size = 168724, upload-time = "2025-05-28T19:25:53.926Z" }, ] +[[package]] +name = "psutil" +version = "7.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/2a/80/336820c1ad9286a4ded7e845b2eccfcb27851ab8ac6abece774a6ff4d3de/psutil-7.0.0.tar.gz", hash = "sha256:7be9c3eba38beccb6495ea33afd982a44074b78f28c434a1f51cc07fd315c456", size = 497003, upload-time = "2025-02-13T21:54:07.946Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ed/e6/2d26234410f8b8abdbf891c9da62bee396583f713fb9f3325a4760875d22/psutil-7.0.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:101d71dc322e3cffd7cea0650b09b3d08b8e7c4109dd6809fe452dfd00e58b25", size = 238051, upload-time = "2025-02-13T21:54:12.36Z" }, + { url = "https://files.pythonhosted.org/packages/04/8b/30f930733afe425e3cbfc0e1468a30a18942350c1a8816acfade80c005c4/psutil-7.0.0-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:39db632f6bb862eeccf56660871433e111b6ea58f2caea825571951d4b6aa3da", size = 239535, upload-time = "2025-02-13T21:54:16.07Z" }, + { url = "https://files.pythonhosted.org/packages/2a/ed/d362e84620dd22876b55389248e522338ed1bf134a5edd3b8231d7207f6d/psutil-7.0.0-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1fcee592b4c6f146991ca55919ea3d1f8926497a713ed7faaf8225e174581e91", size = 275004, upload-time = "2025-02-13T21:54:18.662Z" }, + { url = "https://files.pythonhosted.org/packages/bf/b9/b0eb3f3cbcb734d930fdf839431606844a825b23eaf9a6ab371edac8162c/psutil-7.0.0-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4b1388a4f6875d7e2aff5c4ca1cc16c545ed41dd8bb596cefea80111db353a34", size = 277986, upload-time = "2025-02-13T21:54:21.811Z" }, + { url = "https://files.pythonhosted.org/packages/eb/a2/709e0fe2f093556c17fbafda93ac032257242cabcc7ff3369e2cb76a97aa/psutil-7.0.0-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a5f098451abc2828f7dc6b58d44b532b22f2088f4999a937557b603ce72b1993", size = 279544, upload-time = "2025-02-13T21:54:24.68Z" }, + { url = "https://files.pythonhosted.org/packages/50/e6/eecf58810b9d12e6427369784efe814a1eec0f492084ce8eb8f4d89d6d61/psutil-7.0.0-cp37-abi3-win32.whl", hash = "sha256:ba3fcef7523064a6c9da440fc4d6bd07da93ac726b5733c29027d7dc95b39d99", size = 241053, upload-time = "2025-02-13T21:54:34.31Z" }, + { url = "https://files.pythonhosted.org/packages/50/1b/6921afe68c74868b4c9fa424dad3be35b095e16687989ebbb50ce4fceb7c/psutil-7.0.0-cp37-abi3-win_amd64.whl", hash = "sha256:4cf3d4eb1aa9b348dec30105c55cd9b7d4629285735a102beb4441e38db90553", size = 244885, upload-time = "2025-02-13T21:54:37.486Z" }, +] + [[package]] name = "pyasn1" version = "0.6.1"