Compare commits

...
Sign in to create a new pull request.

1 commit

Author SHA1 Message Date
Edwin Jose
64a543b645 Replace print statements with logger across codebase
Replaced all print statements with logger calls using a centralized logging configuration. This improves consistency, enables log level control, and supports better diagnostics and production readiness.
2025-09-04 17:50:07 -04:00
12 changed files with 103 additions and 78 deletions

View file

@ -3,7 +3,9 @@ from starlette.responses import JSONResponse
from typing import Optional from typing import Optional
from session_manager import User from session_manager import User
from config.settings import is_no_auth_mode from config.settings import is_no_auth_mode
from utils.logging_config import get_logger
logger = get_logger(__name__)
def get_current_user(request: Request, session_manager) -> Optional[User]: def get_current_user(request: Request, session_manager) -> Optional[User]:
"""Extract current user from request cookies""" """Extract current user from request cookies"""
@ -25,7 +27,7 @@ def require_auth(session_manager):
async def wrapper(request: Request): async def wrapper(request: Request):
# In no-auth mode, bypass authentication entirely # In no-auth mode, bypass authentication entirely
if is_no_auth_mode(): if is_no_auth_mode():
print(f"[DEBUG] No-auth mode: Creating anonymous user") logger.debug("No-auth mode: Creating anonymous user")
# Create an anonymous user object so endpoints don't break # Create an anonymous user object so endpoints don't break
from session_manager import User from session_manager import User
from datetime import datetime from datetime import datetime
@ -33,7 +35,7 @@ def require_auth(session_manager):
from session_manager import AnonymousUser from session_manager import AnonymousUser
request.state.user = AnonymousUser() request.state.user = AnonymousUser()
request.state.jwt_token = None # No JWT in no-auth mode request.state.jwt_token = None # No JWT in no-auth mode
print(f"[DEBUG] Set user_id=anonymous, jwt_token=None") logger.debug("Set user_id=anonymous, jwt_token=None")
return await handler(request) return await handler(request)
user = get_current_user(request, session_manager) user = get_current_user(request, session_manager)

View file

@ -8,7 +8,9 @@ from .google_drive import GoogleDriveConnector
from .sharepoint import SharePointConnector from .sharepoint import SharePointConnector
from .onedrive import OneDriveConnector from .onedrive import OneDriveConnector
from .connection_manager import ConnectionManager from .connection_manager import ConnectionManager
from utils.logging_config import get_logger
logger = get_logger(__name__)
class ConnectorService: class ConnectorService:
"""Service to manage document connectors and process files""" """Service to manage document connectors and process files"""
@ -63,7 +65,7 @@ class ConnectorService:
doc_service = DocumentService(session_manager=self.session_manager) doc_service = DocumentService(session_manager=self.session_manager)
print(f"[DEBUG] Processing connector document with ID: {document.id}") logger.debug(f"Processing connector document with ID: {document.id}")
# Process using the existing pipeline but with connector document metadata # Process using the existing pipeline but with connector document metadata
result = await doc_service.process_file_common( result = await doc_service.process_file_common(
@ -78,7 +80,7 @@ class ConnectorService:
connector_type=connector_type, connector_type=connector_type,
) )
print(f"[DEBUG] Document processing result: {result}") logger.debug(f"Document processing result: {result}")
# If successfully indexed or already exists, update the indexed documents with connector metadata # If successfully indexed or already exists, update the indexed documents with connector metadata
if result["status"] in ["indexed", "unchanged"]: if result["status"] in ["indexed", "unchanged"]:
@ -105,7 +107,7 @@ class ConnectorService:
jwt_token: str = None, jwt_token: str = None,
): ):
"""Update indexed chunks with connector-specific metadata""" """Update indexed chunks with connector-specific metadata"""
print(f"[DEBUG] Looking for chunks with document_id: {document.id}") logger.debug(f"Looking for chunks with document_id: {document.id}")
# Find all chunks for this document # Find all chunks for this document
query = {"query": {"term": {"document_id": document.id}}} query = {"query": {"term": {"document_id": document.id}}}
@ -118,25 +120,23 @@ class ConnectorService:
try: try:
response = await opensearch_client.search(index=self.index_name, body=query) response = await opensearch_client.search(index=self.index_name, body=query)
except Exception as e: except Exception as e:
print( logger.error(
f"[ERROR] OpenSearch search failed for connector metadata update: {e}" f"[ERROR] OpenSearch search failed for connector metadata update: {e}"
) )
print(f"[ERROR] Search query: {query}") logger.error(f"Search query: {query}")
raise raise
print(f"[DEBUG] Search query: {query}") logger.debug(f"Search query: {query}")
print( logger.debug(f"Found {len(response['hits']['hits'])} chunks matching document_id: {document.id}")
f"[DEBUG] Found {len(response['hits']['hits'])} chunks matching document_id: {document.id}"
)
# Update each chunk with connector metadata # Update each chunk with connector metadata
print( logger.debug(
f"[DEBUG] Updating {len(response['hits']['hits'])} chunks with connector_type: {connector_type}" f"[DEBUG] Updating {len(response['hits']['hits'])} chunks with connector_type: {connector_type}"
) )
for hit in response["hits"]["hits"]: for hit in response["hits"]["hits"]:
chunk_id = hit["_id"] chunk_id = hit["_id"]
current_connector_type = hit["_source"].get("connector_type", "unknown") current_connector_type = hit["_source"].get("connector_type", "unknown")
print( logger.debug(
f"[DEBUG] Chunk {chunk_id}: current connector_type = {current_connector_type}, updating to {connector_type}" f"[DEBUG] Chunk {chunk_id}: current connector_type = {current_connector_type}, updating to {connector_type}"
) )
@ -165,10 +165,10 @@ class ConnectorService:
await opensearch_client.update( await opensearch_client.update(
index=self.index_name, id=chunk_id, body=update_body index=self.index_name, id=chunk_id, body=update_body
) )
print(f"[DEBUG] Updated chunk {chunk_id} with connector metadata") logger.debug(f"Updated chunk {chunk_id} with connector metadata")
except Exception as e: except Exception as e:
print(f"[ERROR] OpenSearch update failed for chunk {chunk_id}: {e}") logger.error(f"OpenSearch update failed for chunk {chunk_id}: {e}")
print(f"[ERROR] Update body: {update_body}") logger.error(f"Update body: {update_body}")
raise raise
def _get_file_extension(self, mimetype: str) -> str: def _get_file_extension(self, mimetype: str) -> str:
@ -201,7 +201,7 @@ class ConnectorService:
"TaskService not available - connector sync requires task service dependency" "TaskService not available - connector sync requires task service dependency"
) )
print( logger.debug(
f"[DEBUG] Starting sync for connection {connection_id}, max_files={max_files}" f"[DEBUG] Starting sync for connection {connection_id}, max_files={max_files}"
) )
@ -211,7 +211,7 @@ class ConnectorService:
f"Connection '{connection_id}' not found or not authenticated" f"Connection '{connection_id}' not found or not authenticated"
) )
print(f"[DEBUG] Got connector, authenticated: {connector.is_authenticated}") logger.debug(f"Got connector, authenticated: {connector.is_authenticated}")
if not connector.is_authenticated: if not connector.is_authenticated:
raise ValueError(f"Connection '{connection_id}' not authenticated") raise ValueError(f"Connection '{connection_id}' not authenticated")
@ -225,11 +225,11 @@ class ConnectorService:
while True: while True:
# List files from connector with limit # List files from connector with limit
print( logger.debug(
f"[DEBUG] Calling list_files with page_size={page_size}, page_token={page_token}" f"[DEBUG] Calling list_files with page_size={page_size}, page_token={page_token}"
) )
file_list = await connector.list_files(page_token, limit=page_size) file_list = await connector.list_files(page_token, limit=page_size)
print(f"[DEBUG] Got {len(file_list.get('files', []))} files") logger.debug(f"Got {len(file_list.get('files', []))} files")
files = file_list["files"] files = file_list["files"]
if not files: if not files:

View file

@ -1,7 +1,9 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Any, Dict from typing import Any, Dict
from .tasks import UploadTask, FileTask from .tasks import UploadTask, FileTask
from utils.logging_config import get_logger
logger = get_logger(__name__)
class TaskProcessor(ABC): class TaskProcessor(ABC):
"""Abstract base class for task processors""" """Abstract base class for task processors"""
@ -225,10 +227,10 @@ class S3FileProcessor(TaskProcessor):
index=INDEX_NAME, id=chunk_id, body=chunk_doc index=INDEX_NAME, id=chunk_id, body=chunk_doc
) )
except Exception as e: except Exception as e:
print( logger.error(
f"[ERROR] OpenSearch indexing failed for S3 chunk {chunk_id}: {e}" f"[ERROR] OpenSearch indexing failed for S3 chunk {chunk_id}: {e}"
) )
print(f"[ERROR] Chunk document: {chunk_doc}") logger.error(f"Chunk document: {chunk_doc}")
raise raise
result = {"status": "indexed", "id": slim_doc["id"]} result = {"status": "indexed", "id": slim_doc["id"]}

View file

@ -7,7 +7,9 @@ from typing import Dict
from models.tasks import TaskStatus, UploadTask, FileTask from models.tasks import TaskStatus, UploadTask, FileTask
from src.utils.gpu_detection import get_worker_count from src.utils.gpu_detection import get_worker_count
from utils.logging_config import get_logger
logger = get_logger(__name__)
class TaskService: class TaskService:
def __init__(self, document_service=None, process_pool=None): def __init__(self, document_service=None, process_pool=None):
@ -104,7 +106,7 @@ class TaskService:
await asyncio.gather(*tasks, return_exceptions=True) await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e: except Exception as e:
print(f"[ERROR] Background upload processor failed for task {task_id}: {e}") logger.error(f"Background upload processor failed for task {task_id}: {e}")
import traceback import traceback
traceback.print_exc() traceback.print_exc()
@ -136,7 +138,7 @@ class TaskService:
try: try:
await processor.process_item(upload_task, item, file_task) await processor.process_item(upload_task, item, file_task)
except Exception as e: except Exception as e:
print(f"[ERROR] Failed to process item {item}: {e}") logger.error(f"Failed to process item {item}: {e}")
import traceback import traceback
traceback.print_exc() traceback.print_exc()
@ -157,13 +159,13 @@ class TaskService:
upload_task.updated_at = time.time() upload_task.updated_at = time.time()
except asyncio.CancelledError: except asyncio.CancelledError:
print(f"[INFO] Background processor for task {task_id} was cancelled") logger.info(f"Background processor for task {task_id} was cancelled")
if user_id in self.task_store and task_id in self.task_store[user_id]: if user_id in self.task_store and task_id in self.task_store[user_id]:
# Task status and pending files already handled by cancel_task() # Task status and pending files already handled by cancel_task()
pass pass
raise # Re-raise to properly handle cancellation raise # Re-raise to properly handle cancellation
except Exception as e: except Exception as e:
print(f"[ERROR] Background custom processor failed for task {task_id}: {e}") logger.error(f"Background custom processor failed for task {task_id}: {e}")
import traceback import traceback
traceback.print_exc() traceback.print_exc()

View file

@ -6,7 +6,9 @@ from typing import Dict, Optional, Any
from dataclasses import dataclass, asdict from dataclasses import dataclass, asdict
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
import os import os
from utils.logging_config import get_logger
logger = get_logger(__name__)
@dataclass @dataclass
class User: class User:
@ -93,13 +95,13 @@ class SessionManager:
if response.status_code == 200: if response.status_code == 200:
return response.json() return response.json()
else: else:
print( logger.error(
f"Failed to get user info: {response.status_code} {response.text}" f"Failed to get user info: {response.status_code} {response.text}"
) )
return None return None
except Exception as e: except Exception as e:
print(f"Error getting user info: {e}") logger.error(f"Error getting user info: {e}")
return None return None
async def create_user_session( async def create_user_session(
@ -186,19 +188,19 @@ class SessionManager:
"""Get or create OpenSearch client for user with their JWT""" """Get or create OpenSearch client for user with their JWT"""
from config.settings import is_no_auth_mode from config.settings import is_no_auth_mode
print( logger.debug(
f"[DEBUG] get_user_opensearch_client: user_id={user_id}, jwt_token={'None' if jwt_token is None else 'present'}, no_auth_mode={is_no_auth_mode()}" f"get_user_opensearch_client: user_id={user_id}, jwt_token={'None' if jwt_token is None else 'present'}, no_auth_mode={is_no_auth_mode()}"
) )
# In no-auth mode, create anonymous JWT for OpenSearch DLS # In no-auth mode, create anonymous JWT for OpenSearch DLS
if is_no_auth_mode() and jwt_token is None: if is_no_auth_mode() and jwt_token is None:
if not hasattr(self, "_anonymous_jwt"): if not hasattr(self, "_anonymous_jwt"):
# Create anonymous JWT token for OpenSearch OIDC # Create anonymous JWT token for OpenSearch OIDC
print(f"[DEBUG] Creating anonymous JWT...") logger.debug("Creating anonymous JWT...")
self._anonymous_jwt = self._create_anonymous_jwt() self._anonymous_jwt = self._create_anonymous_jwt()
print(f"[DEBUG] Anonymous JWT created: {self._anonymous_jwt[:50]}...") logger.debug(f"Anonymous JWT created: {self._anonymous_jwt[:50]}...")
jwt_token = self._anonymous_jwt jwt_token = self._anonymous_jwt
print(f"[DEBUG] Using anonymous JWT for OpenSearch") logger.debug("Using anonymous JWT for OpenSearch")
# Check if we have a cached client for this user # Check if we have a cached client for this user
if user_id not in self.user_opensearch_clients: if user_id not in self.user_opensearch_clients:

View file

@ -13,7 +13,9 @@ from .managers.env_manager import EnvManager
from .managers.container_manager import ContainerManager from .managers.container_manager import ContainerManager
from .utils.platform import PlatformDetector from .utils.platform import PlatformDetector
from .widgets.diagnostics_notification import notify_with_diagnostics from .widgets.diagnostics_notification import notify_with_diagnostics
from utils.logging_config import get_logger
logger = get_logger(__name__)
class OpenRAGTUI(App): class OpenRAGTUI(App):
"""OpenRAG Terminal User Interface application.""" """OpenRAG Terminal User Interface application."""
@ -221,10 +223,10 @@ def run_tui():
app = OpenRAGTUI() app = OpenRAGTUI()
app.run() app.run()
except KeyboardInterrupt: except KeyboardInterrupt:
print("\nOpenRAG TUI interrupted by user") logger.info("\nOpenRAG TUI interrupted by user")
sys.exit(0) sys.exit(0)
except Exception as e: except Exception as e:
print(f"Error running OpenRAG TUI: {e}") logger.error(f"Error running OpenRAG TUI: {e}")
sys.exit(1) sys.exit(1)

View file

@ -11,7 +11,9 @@ from typing import Dict, List, Optional, AsyncIterator
from ..utils.platform import PlatformDetector, RuntimeInfo, RuntimeType from ..utils.platform import PlatformDetector, RuntimeInfo, RuntimeType
from utils.gpu_detection import detect_gpu_devices from utils.gpu_detection import detect_gpu_devices
from utils.logging_config import get_logger
logger = get_logger(__name__)
class ServiceStatus(Enum): class ServiceStatus(Enum):
"""Container service status.""" """Container service status."""
@ -177,7 +179,7 @@ class ContainerManager:
def _process_service_json(self, service: Dict, services: Dict[str, ServiceInfo]) -> None: def _process_service_json(self, service: Dict, services: Dict[str, ServiceInfo]) -> None:
"""Process a service JSON object and add it to the services dict.""" """Process a service JSON object and add it to the services dict."""
# Debug print to see the actual service data # Debug print to see the actual service data
print(f"DEBUG: Processing service data: {json.dumps(service, indent=2)}") logger.debug(f"Processing service data: {json.dumps(service, indent=2)}")
container_name = service.get("Name", "") container_name = service.get("Name", "")

View file

@ -16,7 +16,9 @@ from ..utils.validation import (
validate_documents_paths, validate_documents_paths,
sanitize_env_value sanitize_env_value
) )
from utils.logging_config import get_logger
logger = get_logger(__name__)
@dataclass @dataclass
class EnvConfig: class EnvConfig:
@ -115,7 +117,7 @@ class EnvManager:
return True return True
except Exception as e: except Exception as e:
print(f"Error loading .env file: {e}") logger.error(f"Error loading .env file: {e}")
return False return False
def setup_secure_defaults(self) -> None: def setup_secure_defaults(self) -> None:
@ -245,7 +247,7 @@ class EnvManager:
return True return True
except Exception as e: except Exception as e:
print(f"Error saving .env file: {e}") logger.error(f"Error saving .env file: {e}")
return False return False
def get_no_auth_setup_fields(self) -> List[tuple[str, str, str, bool]]: def get_no_auth_setup_fields(self) -> List[tuple[str, str, str, bool]]:

View file

@ -2,6 +2,9 @@ import hashlib
import os import os
from collections import defaultdict from collections import defaultdict
from .gpu_detection import detect_gpu_devices from .gpu_detection import detect_gpu_devices
from utils.logging_config import get_logger
logger = get_logger(__name__)
# Global converter cache for worker processes # Global converter cache for worker processes
_worker_converter = None _worker_converter = None
@ -37,11 +40,11 @@ def get_worker_converter():
"1" # Still disable progress bars "1" # Still disable progress bars
) )
print( logger.info(
f"[WORKER {os.getpid()}] Initializing DocumentConverter in worker process" f"[WORKER {os.getpid()}] Initializing DocumentConverter in worker process"
) )
_worker_converter = DocumentConverter() _worker_converter = DocumentConverter()
print(f"[WORKER {os.getpid()}] DocumentConverter ready in worker process") logger.info(f"[WORKER {os.getpid()}] DocumentConverter ready in worker process")
return _worker_converter return _worker_converter
@ -118,33 +121,33 @@ def process_document_sync(file_path: str):
start_memory = process.memory_info().rss / 1024 / 1024 # MB start_memory = process.memory_info().rss / 1024 / 1024 # MB
try: try:
print(f"[WORKER {os.getpid()}] Starting document processing: {file_path}") logger.info(f"[WORKER {os.getpid()}] Starting document processing: {file_path}")
print(f"[WORKER {os.getpid()}] Initial memory usage: {start_memory:.1f} MB") logger.info(f"[WORKER {os.getpid()}] Initial memory usage: {start_memory:.1f} MB")
# Check file size # Check file size
try: try:
file_size = os.path.getsize(file_path) / 1024 / 1024 # MB file_size = os.path.getsize(file_path) / 1024 / 1024 # MB
print(f"[WORKER {os.getpid()}] File size: {file_size:.1f} MB") logger.info(f"[WORKER {os.getpid()}] File size: {file_size:.1f} MB")
except OSError as e: except OSError as e:
print(f"[WORKER {os.getpid()}] WARNING: Cannot get file size: {e}") logger.warning(f"[WORKER {os.getpid()}] WARNING: Cannot get file size: {e}")
file_size = 0 file_size = 0
# Get the cached converter for this worker # Get the cached converter for this worker
try: try:
print(f"[WORKER {os.getpid()}] Getting document converter...") logger.info(f"[WORKER {os.getpid()}] Getting document converter...")
converter = get_worker_converter() converter = get_worker_converter()
memory_after_converter = process.memory_info().rss / 1024 / 1024 memory_after_converter = process.memory_info().rss / 1024 / 1024
print( logger.info(
f"[WORKER {os.getpid()}] Memory after converter init: {memory_after_converter:.1f} MB" f"[WORKER {os.getpid()}] Memory after converter init: {memory_after_converter:.1f} MB"
) )
except Exception as e: except Exception as e:
print(f"[WORKER {os.getpid()}] ERROR: Failed to initialize converter: {e}") logger.error(f"[WORKER {os.getpid()}] ERROR: Failed to initialize converter: {e}")
traceback.print_exc() traceback.print_exc()
raise raise
# Compute file hash # Compute file hash
try: try:
print(f"[WORKER {os.getpid()}] Computing file hash...") logger.info(f"[WORKER {os.getpid()}] Computing file hash...")
sha256 = hashlib.sha256() sha256 = hashlib.sha256()
with open(file_path, "rb") as f: with open(file_path, "rb") as f:
while True: while True:
@ -153,39 +156,39 @@ def process_document_sync(file_path: str):
break break
sha256.update(chunk) sha256.update(chunk)
file_hash = sha256.hexdigest() file_hash = sha256.hexdigest()
print(f"[WORKER {os.getpid()}] File hash computed: {file_hash[:12]}...") logger.info(f"[WORKER {os.getpid()}] File hash computed: {file_hash[:12]}...")
except Exception as e: except Exception as e:
print(f"[WORKER {os.getpid()}] ERROR: Failed to compute file hash: {e}") logger.error(f"[WORKER {os.getpid()}] ERROR: Failed to compute file hash: {e}")
traceback.print_exc() traceback.print_exc()
raise raise
# Convert with docling # Convert with docling
try: try:
print(f"[WORKER {os.getpid()}] Starting docling conversion...") logger.info(f"[WORKER {os.getpid()}] Starting docling conversion...")
memory_before_convert = process.memory_info().rss / 1024 / 1024 memory_before_convert = process.memory_info().rss / 1024 / 1024
print( logger.info(
f"[WORKER {os.getpid()}] Memory before conversion: {memory_before_convert:.1f} MB" f"[WORKER {os.getpid()}] Memory before conversion: {memory_before_convert:.1f} MB"
) )
result = converter.convert(file_path) result = converter.convert(file_path)
memory_after_convert = process.memory_info().rss / 1024 / 1024 memory_after_convert = process.memory_info().rss / 1024 / 1024
print( logger.info(
f"[WORKER {os.getpid()}] Memory after conversion: {memory_after_convert:.1f} MB" f"[WORKER {os.getpid()}] Memory after conversion: {memory_after_convert:.1f} MB"
) )
print(f"[WORKER {os.getpid()}] Docling conversion completed") logger.info(f"[WORKER {os.getpid()}] Docling conversion completed")
full_doc = result.document.export_to_dict() full_doc = result.document.export_to_dict()
memory_after_export = process.memory_info().rss / 1024 / 1024 memory_after_export = process.memory_info().rss / 1024 / 1024
print( logger.info(
f"[WORKER {os.getpid()}] Memory after export: {memory_after_export:.1f} MB" f"[WORKER {os.getpid()}] Memory after export: {memory_after_export:.1f} MB"
) )
except Exception as e: except Exception as e:
print( logger.error(
f"[WORKER {os.getpid()}] ERROR: Failed during docling conversion: {e}" f"[WORKER {os.getpid()}] ERROR: Failed during docling conversion: {e}"
) )
print( logger.info(
f"[WORKER {os.getpid()}] Current memory usage: {process.memory_info().rss / 1024 / 1024:.1f} MB" f"[WORKER {os.getpid()}] Current memory usage: {process.memory_info().rss / 1024 / 1024:.1f} MB"
) )
traceback.print_exc() traceback.print_exc()
@ -193,10 +196,10 @@ def process_document_sync(file_path: str):
# Extract relevant content (same logic as extract_relevant) # Extract relevant content (same logic as extract_relevant)
try: try:
print(f"[WORKER {os.getpid()}] Extracting relevant content...") logger.info(f"[WORKER {os.getpid()}] Extracting relevant content...")
origin = full_doc.get("origin", {}) origin = full_doc.get("origin", {})
texts = full_doc.get("texts", []) texts = full_doc.get("texts", [])
print(f"[WORKER {os.getpid()}] Found {len(texts)} text fragments") logger.info(f"[WORKER {os.getpid()}] Found {len(texts)} text fragments")
page_texts = defaultdict(list) page_texts = defaultdict(list)
for txt in texts: for txt in texts:
@ -210,12 +213,12 @@ def process_document_sync(file_path: str):
joined = "\n".join(page_texts[page]) joined = "\n".join(page_texts[page])
chunks.append({"page": page, "text": joined}) chunks.append({"page": page, "text": joined})
print( logger.info(
f"[WORKER {os.getpid()}] Created {len(chunks)} chunks from {len(page_texts)} pages" f"[WORKER {os.getpid()}] Created {len(chunks)} chunks from {len(page_texts)} pages"
) )
except Exception as e: except Exception as e:
print( logger.error(
f"[WORKER {os.getpid()}] ERROR: Failed during content extraction: {e}" f"[WORKER {os.getpid()}] ERROR: Failed during content extraction: {e}"
) )
traceback.print_exc() traceback.print_exc()
@ -223,8 +226,8 @@ def process_document_sync(file_path: str):
final_memory = process.memory_info().rss / 1024 / 1024 final_memory = process.memory_info().rss / 1024 / 1024
memory_delta = final_memory - start_memory memory_delta = final_memory - start_memory
print(f"[WORKER {os.getpid()}] Document processing completed successfully") logger.info(f"[WORKER {os.getpid()}] Document processing completed successfully")
print( logger.info(
f"[WORKER {os.getpid()}] Final memory: {final_memory:.1f} MB (Delta +{memory_delta:.1f} MB)" f"[WORKER {os.getpid()}] Final memory: {final_memory:.1f} MB (Delta +{memory_delta:.1f} MB)"
) )
@ -239,24 +242,24 @@ def process_document_sync(file_path: str):
except Exception as e: except Exception as e:
final_memory = process.memory_info().rss / 1024 / 1024 final_memory = process.memory_info().rss / 1024 / 1024
memory_delta = final_memory - start_memory memory_delta = final_memory - start_memory
print(f"[WORKER {os.getpid()}] FATAL ERROR in process_document_sync") logger.error(f"[WORKER {os.getpid()}] FATAL ERROR in process_document_sync")
print(f"[WORKER {os.getpid()}] File: {file_path}") logger.info(f"[WORKER {os.getpid()}] File: {file_path}")
print(f"[WORKER {os.getpid()}] Python version: {sys.version}") logger.info(f"[WORKER {os.getpid()}] Python version: {sys.version}")
print( logger.info(
f"[WORKER {os.getpid()}] Memory at crash: {final_memory:.1f} MB (Delta +{memory_delta:.1f} MB)" f"[WORKER {os.getpid()}] Memory at crash: {final_memory:.1f} MB (Delta +{memory_delta:.1f} MB)"
) )
print(f"[WORKER {os.getpid()}] Error: {type(e).__name__}: {e}") logger.error(f"[WORKER {os.getpid()}] Error: {type(e).__name__}: {e}")
print(f"[WORKER {os.getpid()}] Full traceback:") logger.error(f"[WORKER {os.getpid()}] Full traceback:")
traceback.print_exc() traceback.print_exc()
# Try to get more system info before crashing # Try to get more system info before crashing
try: try:
import platform import platform
print( logger.info(
f"[WORKER {os.getpid()}] System: {platform.system()} {platform.release()}" f"[WORKER {os.getpid()}] System: {platform.system()} {platform.release()}"
) )
print(f"[WORKER {os.getpid()}] Architecture: {platform.machine()}") logger.info(f"[WORKER {os.getpid()}] Architecture: {platform.machine()}")
except: except:
pass pass

View file

@ -1,6 +1,8 @@
import multiprocessing import multiprocessing
import os import os
from utils.logging_config import get_logger
logger = get_logger(__name__)
def detect_gpu_devices(): def detect_gpu_devices():
"""Detect if GPU devices are actually available""" """Detect if GPU devices are actually available"""
@ -30,12 +32,12 @@ def get_worker_count():
if has_gpu_devices: if has_gpu_devices:
default_workers = min(4, multiprocessing.cpu_count() // 2) default_workers = min(4, multiprocessing.cpu_count() // 2)
print( logger.info(
f"GPU mode enabled with {gpu_count} GPU(s) - using limited concurrency ({default_workers} workers)" f"GPU mode enabled with {gpu_count} GPU(s) - using limited concurrency ({default_workers} workers)"
) )
else: else:
default_workers = multiprocessing.cpu_count() default_workers = multiprocessing.cpu_count()
print( logger.info(
f"CPU-only mode enabled - using full concurrency ({default_workers} workers)" f"CPU-only mode enabled - using full concurrency ({default_workers} workers)"
) )

View file

@ -1,10 +1,13 @@
import os import os
from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ProcessPoolExecutor
from utils.gpu_detection import get_worker_count from utils.gpu_detection import get_worker_count
from utils.logging_config import get_logger
logger = get_logger(__name__)
# Create shared process pool at import time (before CUDA initialization) # Create shared process pool at import time (before CUDA initialization)
# This avoids the "Cannot re-initialize CUDA in forked subprocess" error # This avoids the "Cannot re-initialize CUDA in forked subprocess" error
MAX_WORKERS = get_worker_count() MAX_WORKERS = get_worker_count()
process_pool = ProcessPoolExecutor(max_workers=MAX_WORKERS) process_pool = ProcessPoolExecutor(max_workers=MAX_WORKERS)
print(f"Shared process pool initialized with {MAX_WORKERS} workers") logger.info(f"Shared process pool initialized with {MAX_WORKERS} workers")

View file

@ -1,13 +1,16 @@
from docling.document_converter import DocumentConverter from docling.document_converter import DocumentConverter
from utils.logging_config import get_logger
print("Warming up docling models...") logger = get_logger(__name__)
logger.info("Warming up docling models...")
try: try:
# Use the sample document to warm up docling # Use the sample document to warm up docling
test_file = "/app/warmup_ocr.pdf" test_file = "/app/warmup_ocr.pdf"
print(f"Using {test_file} to warm up docling...") logger.info(f"Using {test_file} to warm up docling...")
DocumentConverter().convert(test_file) DocumentConverter().convert(test_file)
print("Docling models warmed up successfully") logger.info("Docling models warmed up successfully")
except Exception as e: except Exception as e:
print(f"Docling warm-up completed with: {e}") logger.error(f"Docling warm-up completed with: {e}")
# This is expected - we just want to trigger the model downloads # This is expected - we just want to trigger the model downloads